Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api-reference/scheduler-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ _Appears in:_
| --- | --- | --- | --- |
| `name` _string_ | Name is the name of the PodGroup. | | |
| `podReferences` _[NamespacedName](#namespacedname) array_ | PodReferences is a list of references to the Pods that are part of this group. | | |
| `minReplicas` _integer_ | MinReplicas is the number of replicas that needs to be gang scheduled.<br />If the MinReplicas is greater than len(PodReferences) then scheduler makes the best effort to schedule as many pods beyond<br />MinReplicas. However, guaranteed gang scheduling is only provided for MinReplicas. | | |
| `minReplicas` _integer_ | MinReplicas is the number of replicas that needs to be gang scheduled.<br />If the MinReplicas is lesser than len(PodReferences) then scheduler makes the best effort to schedule as many pods beyond<br />MinReplicas. However, guaranteed gang scheduling is only provided for MinReplicas. | | |
| `topologyConstraint` _[TopologyConstraint](#topologyconstraint)_ | TopologyConstraint defines topology packing constraints for this PodGroup.<br />Enables PodClique-level topology constraints.<br />Updated by operator when PodClique topology constraints change. | | |


Expand Down
2 changes: 1 addition & 1 deletion operator/charts/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ config:
# - domain: numa
# key: <provider-specific-numa-node-label-key>
authorizer:
enabled: false
enabled: true
exemptServiceAccountUserNames:
- system:serviceaccount:kube-system:generic-garbage-collector
- system:serviceaccount:kube-system:horizontal-pod-autoscaler
Expand Down
3 changes: 1 addition & 2 deletions operator/e2e/cmd/setup-debug-cluster/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func run(cli *CLI) error {
// Set up the cluster
logger.Info("🚀 Setting up K3D cluster with Grove operator...")

_, cleanup, err := setup.SetupCompleteK3DCluster(runCtx, cfg, skaffoldPath, logger)
_, cleanup, err := setup.CreateK3DClusterWithComponents(runCtx, cfg, skaffoldPath, logger)
if err != nil {
logger.Errorf("Failed to setup K3D cluster: %v", err)
if cleanup != nil {
Expand Down Expand Up @@ -341,4 +341,3 @@ func mergeKubeconfigs(existing, new *clientcmdapi.Config, _ string) *clientcmdap

return merged
}

3 changes: 2 additions & 1 deletion operator/e2e/setup/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ const (
// OperatorNamespace is the namespace where the Grove operator is deployed for E2E tests.
// This is used during installation (via Skaffold) and for finding operator pods during diagnostics.
OperatorNamespace = "grove-system"

// OperatorDeploymentName is the name of the operator deployment (also the Helm release name).
// This is used to find operator pods for log collection during test failures.
OperatorDeploymentName = "grove-operator"
// ServiceAccountName is the name of the ServiceAccount used by the Grove operator.
ServiceAccountName = "grove-operator"
)
137 changes: 102 additions & 35 deletions operator/e2e/setup/k8s_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
k3d "github.com/k3d-io/k3d/v5/pkg/types"
"github.com/samber/lo"
"github.com/sirupsen/logrus"
authenticationv1 "k8s.io/api/authentication/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -52,6 +53,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/utils/ptr"
"sigs.k8s.io/yaml"
)

Expand All @@ -60,6 +62,8 @@ const (
defaultPollTimeout = 5 * time.Minute
// defaultPollInterval is the interval for most polling conditions
defaultPollInterval = 5 * time.Second
// defaultOperatorSATokenExpirationSeconds is the expiration time for a bearer token generated using service account used by Grove operator.
defaultOperatorSATokenExpirationSeconds = int64(10800) // 3 hours - more than sufficient for e2e tests
)

// transientErrors contains error substrings that indicate a webhook is not ready yet
Expand Down Expand Up @@ -257,9 +261,9 @@ func ensureRegistryDoesNotExist(ctx context.Context, registryPort string, logger
return nil
}

// SetupCompleteK3DCluster creates a complete k3d cluster with Grove, Kai Scheduler, and NVIDIA GPU Operator
func SetupCompleteK3DCluster(ctx context.Context, cfg ClusterConfig, skaffoldYAMLPath string, logger *utils.Logger) (*rest.Config, func(), error) {
restConfig, cleanup, err := SetupK3DCluster(ctx, cfg, logger)
// CreateK3DClusterWithComponents creates a complete k3d cluster with Grove, Kai Scheduler components.
func CreateK3DClusterWithComponents(ctx context.Context, cfg ClusterConfig, skaffoldYAMLPath string, logger *utils.Logger) (*rest.Config, func(), error) {
restConfig, cleanup, err := CreateK3DCluster(ctx, cfg, logger)
if err != nil {
return nil, cleanup, err
}
Expand All @@ -286,16 +290,16 @@ func SetupCompleteK3DCluster(ctx context.Context, cfg ClusterConfig, skaffoldYAM
if err != nil {
return nil, enhancedCleanup, fmt.Errorf("failed to load dependencies: %w", err)
}
imagesToPrepull := deps.GetImagesToPrePull()
imagesToPrePull := deps.GetImagesToPrePull()

// Pre-pull and push images to local registry if enabled
if cfg.EnableRegistry {
if err := prepullImages(ctx, imagesToPrepull, cfg.RegistryPort, logger); err != nil {
if err := prePullImages(ctx, imagesToPrePull, cfg.RegistryPort, logger); err != nil {
logger.Warnf("⚠️ Failed to pre-load images (cluster will still work, but slower startup): %v", err)
// Don't fail the cluster creation if image pre-loading fails - cluster will still work
} else {
// Verify images are accessible from the registry
if err := verifyRegistryImages(imagesToPrepull, cfg.RegistryPort, logger); err != nil {
if err := verifyRegistryImages(imagesToPrePull, cfg.RegistryPort, logger); err != nil {
logger.Warnf("⚠️ Registry images verification failed: %v", err)
}
}
Expand Down Expand Up @@ -335,53 +339,46 @@ func SetupCompleteK3DCluster(ctx context.Context, cfg ClusterConfig, skaffoldYAM
}

logger.Info("🚀 Installing Grove, Kai Scheduler...")
if err := InstallCoreComponents(ctx, restConfig, kaiConfig, skaffoldYAMLPath, cfg.RegistryPort, logger); err != nil {
if err = InstallCoreComponents(ctx, restConfig, kaiConfig, skaffoldYAMLPath, cfg.RegistryPort, logger); err != nil {
cleanup()
return nil, nil, fmt.Errorf("component installation failed: %w", err)
}

// Wait for Grove pods to be ready (0 = skip count validation)
if err := utils.WaitForPodsInNamespace(ctx, OperatorNamespace, restConfig, 0, defaultPollTimeout, defaultPollInterval, logger); err != nil {
if err = utils.WaitForPodsInNamespace(ctx, OperatorNamespace, restConfig, 0, defaultPollTimeout, defaultPollInterval, logger); err != nil {
cleanup()

return nil, nil, fmt.Errorf("grove pods not ready: %w", err)
}

// Wait for Kai Scheduler pods to be ready (0 = skip count validation)
if err := utils.WaitForPodsInNamespace(ctx, kaiConfig.Namespace, kaiConfig.RestConfig, 0, defaultPollTimeout, defaultPollInterval, kaiConfig.Logger); err != nil {
if err = utils.WaitForPodsInNamespace(ctx, kaiConfig.Namespace, kaiConfig.RestConfig, 0, defaultPollTimeout, defaultPollInterval, kaiConfig.Logger); err != nil {
cleanup()
return nil, nil, fmt.Errorf("kai scheduler pods not ready: %w", err)
}

// Wait for the Kai CRDs to be available (before creating queues)
if err := WaitForKaiCRDs(ctx, kaiConfig); err != nil {
if err = WaitForKaiCRDs(ctx, kaiConfig); err != nil {
cleanup()
return nil, nil, fmt.Errorf("failed to wait for Kai CRDs: %w", err)
}

// need to create the default Kai queues
if err := CreateDefaultKaiQueues(ctx, kaiConfig); err != nil {
if err = CreateDefaultKaiQueues(ctx, kaiConfig); err != nil {
cleanup()
return nil, nil, fmt.Errorf("failed to create default Kai queue: %w", err)
}

// Nvidia Operator seems to take the longest to be ready, so we wait for it last
// to get the most done while waiting. (0 = skip count validation)
//if err := utils.WaitForPodsInNamespace(ctx, gpuOperatorConfig.Namespace, gpuOperatorConfig.RestConfig, 0, defaultPollTimeout, defaultPollInterval, gpuOperatorConfig.Logger); err != nil {
// cleanup()
// return nil, nil, fmt.Errorf("NVIDIA GPU Operator not ready: %w", err)
//}

// Wait for Grove webhook to be ready by actually testing it
// This ensures the webhook is ready to handle requests before tests start
if err := waitForWebhookReady(ctx, restConfig, logger); err != nil {
if err = waitForWebhookReady(ctx, restConfig, logger); err != nil {
cleanup()
return nil, nil, fmt.Errorf("grove webhook not ready: %w", err)
}

// Find any deployed images that are not in the prepull list
namespacesToCheck := []string{kaiConfig.Namespace /*gpuOperatorConfig.Namespace*/}
missingImages := findMissingImages(ctx, clientset, namespacesToCheck, imagesToPrepull, logger)
missingImages := findMissingImages(ctx, clientset, namespacesToCheck, imagesToPrePull, logger)

// Warn about missing images to help keep the prepull list up to date
if len(missingImages) > 0 {
Expand All @@ -396,8 +393,9 @@ func SetupCompleteK3DCluster(ctx context.Context, cfg ClusterConfig, skaffoldYAM
return restConfig, enhancedCleanup, nil
}

// SetupK3DCluster creates a k3d cluster and returns a REST config
func SetupK3DCluster(ctx context.Context, cfg ClusterConfig, logger *utils.Logger) (*rest.Config, func(), error) {
// CreateK3DCluster creates a k3d cluster and returns a REST config, a cleanup function which will clean up the cluster, and an error if any.
// It is the responsibility of the caller to call the cleanup function when done.
func CreateK3DCluster(ctx context.Context, cfg ClusterConfig, logger *utils.Logger) (*rest.Config, func(), error) {
// k3d is very verbose, we don't want the INFO level logs unless the logger is set to DEBUG
// k3d uses logrus internally, so we need to convert our level to logrus level
if logger.GetLevel() == utils.DebugLevel {
Expand All @@ -419,13 +417,20 @@ func SetupK3DCluster(ctx context.Context, cfg ClusterConfig, logger *utils.Logge
Host: "0.0.0.0",
HostPort: cfg.HostPort,
},
Ports: []v1alpha5.PortWithNodeFilters{
{
Port: cfg.LoadBalancerPort,
NodeFilters: []string{"loadbalancer"},
},
},
Options: v1alpha5.SimpleConfigOptions{
// K3D installs traefik load balancer by default, we disable it since its not needed for e2e tests.
K3dOptions: v1alpha5.SimpleConfigOptionsK3d{
DisableLoadbalancer: true,
},
K3sOptions: v1alpha5.SimpleConfigOptionsK3s{
ExtraArgs: []v1alpha5.K3sArgWithNodeFilters{
{
// Disable Traefik ingress controller at the K3s level
Arg: "--disable=traefik",
NodeFilters: []string{"server:*"},
},
},
},
Runtime: v1alpha5.SimpleConfigOptionsRuntime{
// worker node memory
AgentsMemory: cfg.WorkerMemory,
Expand Down Expand Up @@ -493,7 +498,7 @@ configs:
return nil, nil, fmt.Errorf("failed to transform config: %w", err)
}

if err := ensureClusterDoesNotExist(ctx, k3dConfig.Name, logger); err != nil {
if err = ensureClusterDoesNotExist(ctx, k3dConfig.Name, logger); err != nil {
return nil, nil, err
}

Expand All @@ -510,7 +515,7 @@ configs:
cleanupCtx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
logger.Debug("🗑️ Deleting cluster...")
if err := k3dclient.ClusterDelete(cleanupCtx, runtimes.Docker, &k3dConfig.Cluster, k3d.ClusterDeleteOpts{}); err != nil {
if err = k3dclient.ClusterDelete(cleanupCtx, runtimes.Docker, &k3dConfig.Cluster, k3d.ClusterDeleteOpts{}); err != nil {
logger.Errorf("Failed to delete cluster: %v", err)
} else {
logger.Info("✅ Cluster deleted successfully")
Expand Down Expand Up @@ -559,7 +564,7 @@ configs:
}

// Check if it was a timeout
if attemptCtx.Err() == context.DeadlineExceeded {
if errors.Is(attemptCtx.Err(), context.DeadlineExceeded) {
logger.Errorf("❌ Cluster creation timed out after %v (attempt %d/%d)", clusterCreateTimeout, attempt, maxClusterCreateRetries)
} else {
logger.Errorf("❌ Cluster creation failed (attempt %d/%d): %v", attempt, maxClusterCreateRetries, createErr)
Expand Down Expand Up @@ -870,10 +875,10 @@ func restartNodeContainer(ctx context.Context, nodeName string, logger *utils.Lo
return nil
}

// prepullImages pre-pulls container images and pushes them to the local k3d registry
// prePullImages pre-pulls container images and pushes them to the local k3d registry
// This significantly speeds up cluster startup by avoiding image pull delays during pod creation.
// Note: Using the registry is much faster than k3d's image import which creates tar archives.
func prepullImages(ctx context.Context, images []string, registryPort string, logger *utils.Logger) error {
func prePullImages(ctx context.Context, images []string, registryPort string, logger *utils.Logger) error {
logger.Info("📦 Pre-pulling and pushing container images to local registry...")

// Create Docker client
Expand Down Expand Up @@ -974,7 +979,7 @@ func verifyRegistryImages(images []string, registryPort string, logger *utils.Lo
logger.Debug("🔍 Verifying images are in registry...")

for _, img := range images {
// Extract the image path WITHOUT the registry domain (same logic as prepullImages)
// Extract the image path WITHOUT the registry domain (same logic as prePullImages)
imagePath := stripRegistryDomain(img)

// Try to pull from local registry to verify it exists
Expand Down Expand Up @@ -1110,7 +1115,69 @@ func buildLDFlagsForE2E() string {
return ldflags
}

// waitForWebhookReady waits for the webhook server to be ready.
// GetOperatorDynamicClient creates a dynamic client using the Grove operator's service account credentials.
// This should be used by e2e tests when they need to modify managed resources (like PCLQ, PCSG, Pods managed by Grove etc.) that are protected
// by the authorizer webhook. The authorizer allows the operator's service account to modify these resources.
func GetOperatorDynamicClient(ctx context.Context, adminRestConfig *rest.Config, logger *utils.Logger) (dynamic.Interface, error) {
operatorRestConfig, err := getOperatorServiceAccountRestConfig(ctx, adminRestConfig, logger)
if err != nil {
return nil, fmt.Errorf("failed to get operator REST config: %w", err)
}

dynamicClient, err := dynamic.NewForConfig(operatorRestConfig)
if err != nil {
return nil, fmt.Errorf("failed to create dynamic client: %w", err)
}
return dynamicClient, nil
}

// getOperatorServiceAccountRestConfig creates a REST config using the Grove operator's service account.
// It leverages the TokenRequest API to obtain a short-lived token for authentication.
func getOperatorServiceAccountRestConfig(ctx context.Context, adminRestConfig *rest.Config, logger *utils.Logger) (operatorRestConfig *rest.Config, err error) {
logger.Debug("🔑 Creating REST config using Grove operator service account...")
var k8sClient *kubernetes.Clientset

k8sClient, err = kubernetes.NewForConfig(adminRestConfig)
if err != nil {
err = fmt.Errorf("failed to create K8s client: %w", err)
return
}

// Verify the service account exists
if _, err = k8sClient.CoreV1().ServiceAccounts(OperatorNamespace).Get(ctx, ServiceAccountName, metav1.GetOptions{}); err != nil {
return nil, fmt.Errorf("failed to get service account %s in namespace %s: %w", ServiceAccountName, OperatorNamespace, err)
}

// Use TokenRequest API to create a token (3 hour expiration)
// This is the proper approach for Kubernetes 1.24+ where token secrets are no longer auto-created
tokenReq := &authenticationv1.TokenRequest{
Spec: authenticationv1.TokenRequestSpec{
ExpirationSeconds: ptr.To(defaultOperatorSATokenExpirationSeconds),
},
}
tokenReq, err = k8sClient.CoreV1().ServiceAccounts(OperatorNamespace).CreateToken(ctx, ServiceAccountName, tokenReq, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create token for service account %s: %w", ServiceAccountName, err)
}

if tokenReq.Status.Token == "" {
return nil, fmt.Errorf("token request returned empty token for service account %s", ServiceAccountName)
}

// Create Operator RestConfig by making a copy of the admin RestConfig and overriding the bearer token.
operatorRestConfig = rest.CopyConfig(adminRestConfig)
operatorRestConfig.BearerToken = tokenReq.Status.Token
// Clear cert data since we're using token auth
operatorRestConfig.CertData = nil
operatorRestConfig.KeyData = nil
operatorRestConfig.CertFile = ""
operatorRestConfig.KeyFile = ""

logger.Debug("✅ Created REST config using operator service account token (valid for 1 hour)")
return
}

// waitForWebhookReady waits for the Grove webhook to be ready by testing it.
// This ensures the webhook server is fully registered with the Kubernetes API server
// and can handle admission requests before tests start.
// We do this by making a dry-run request to create a PodCliqueSet using workload1.yaml -
Expand Down
4 changes: 2 additions & 2 deletions operator/e2e/setup/shared_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ func (scm *SharedClusterManager) Setup(ctx context.Context, testImages []string)
// Use the centralized cluster config with overrides for shared test cluster
customCfg := DefaultClusterConfig()
customCfg.Name = "shared-e2e-test-cluster"
customCfg.HostPort = "6560" // Use a different port to avoid conflicts
customCfg.HostPort = "6560" // Use a different port to avoid conflicts
customCfg.LoadBalancerPort = "8090:80"

scm.registryPort = customCfg.RegistryPort

scm.logger.Info("🚀 Setting up shared k3d cluster for all e2e tests...")

restConfig, cleanup, err := SetupCompleteK3DCluster(ctx, customCfg, relativeSkaffoldYAMLPath, scm.logger)
restConfig, cleanup, err := CreateK3DClusterWithComponents(ctx, customCfg, relativeSkaffoldYAMLPath, scm.logger)
// Defer cleanup on error - only call if setup was not successful and we have a cleanup function
defer func() {
if !setupSuccessful && cleanup != nil {
Expand Down
6 changes: 3 additions & 3 deletions operator/e2e/tests/debug_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ func dumpGroveResources(tc TestContext) {
logger.Info("=== GROVE RESOURCES ===")
logger.Info("================================================================================")

if tc.DynamicClient == nil {
logger.Info("[DIAG] DynamicClient is nil, cannot list Grove resources")
if tc.AdminDynamicClient == nil {
logger.Info("[DIAG] AdminDynamicClient is nil, cannot list Grove resources")
return
}

for _, rt := range groveResourceTypes {
logger.Infof("[DIAG] Listing %s in namespace %s...", rt.name, tc.Namespace)
resources, err := tc.DynamicClient.Resource(rt.gvr).Namespace(tc.Namespace).List(tc.Ctx, metav1.ListOptions{})
resources, err := tc.AdminDynamicClient.Resource(rt.gvr).Namespace(tc.Namespace).List(tc.Ctx, metav1.ListOptions{})
if err != nil {
logger.Infof("[DIAG] Failed to list %s: %v", rt.name, err)
continue
Expand Down
Loading
Loading