diff --git a/cmd/rbgs/main.go b/cmd/rbgs/main.go index 540cbfee..efd12f36 100644 --- a/cmd/rbgs/main.go +++ b/cmd/rbgs/main.go @@ -53,6 +53,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" workloadscontroller "sigs.k8s.io/rbgs/internal/controller/workloads" "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/pkg/utils/fieldindex" "sigs.k8s.io/rbgs/version" // +kubebuilder:scaffold:imports @@ -101,6 +102,8 @@ func main() { // Controller runtime options maxConcurrentReconciles int cacheSyncTimeout time.Duration + // Gang scheduling scheduler name: scheduler-plugins or volcano + schedulerPlugin string ) flag.StringVar( &metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+ @@ -135,6 +138,11 @@ func main() { "The number of worker threads used by the the RBGS controller.", ) flag.DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 120*time.Second, "Informer cache sync timeout.") + flag.StringVar( + &schedulerPlugin, "scheduler-name", string(scheduler.KubeSchedulerPlugin), + "The scheduler name to use for gang scheduling. Supported values: scheduler-plugins, volcano. "+ + "Defaults to scheduler-plugins.", + ) flag.Parse() opts := zap.Options{ @@ -282,7 +290,7 @@ func main() { CacheSyncTimeout: cacheSyncTimeout, } - rbgReconciler := workloadscontroller.NewRoleBasedGroupReconciler(mgr) + rbgReconciler := workloadscontroller.NewRoleBasedGroupReconciler(mgr, scheduler.SchedulerPluginType(schedulerPlugin)) if err = rbgReconciler.CheckCrdExists(); err != nil { setupLog.Error(err, "unable to create rbg controller", "controller", "RoleBasedGroup") os.Exit(1) diff --git a/deploy/helm/rbgs/templates/manager.yaml b/deploy/helm/rbgs/templates/manager.yaml index 985a0728..7e42b8c7 100644 --- a/deploy/helm/rbgs/templates/manager.yaml +++ b/deploy/helm/rbgs/templates/manager.yaml @@ -32,6 +32,7 @@ spec: - --metrics-bind-address=:8443 - --leader-elect - --health-probe-bind-address=:8081 + - --scheduler-name={{ .Values.schedulerName | default "scheduler-plugins" }} command: - /manager securityContext: diff --git a/deploy/helm/rbgs/values.yaml b/deploy/helm/rbgs/values.yaml index e46be967..06840081 100644 --- a/deploy/helm/rbgs/values.yaml +++ b/deploy/helm/rbgs/values.yaml @@ -34,6 +34,11 @@ nodeSelector: {} tolerations: [] +# Gang scheduling scheduler name configuration. +# Supported values: scheduler-plugins, volcano +# Defaults to scheduler-plugins. +schedulerName: scheduler-plugins + crdUpgrade: # Whether to enable CRD Upgrader Job (runs before install/upgrade) enabled: true diff --git a/internal/controller/workloads/event.go b/internal/controller/workloads/event.go index 68baefcd..bae3ea63 100644 --- a/internal/controller/workloads/event.go +++ b/internal/controller/workloads/event.go @@ -14,9 +14,13 @@ const ( Succeed = "Succeed" FailedUpdateStatus = "FailedUpdateStatus" FailedCreatePodGroup = "FailedCreatePodGroup" + FailedReconcilePodGroup = "FailedReconcilePodGroup" FailedCreateRevision = "FailedCreateRevision" FailedReconcileDiscoveryConfigMap = "FailedReconcileDiscoveryConfigMap" SucceedCreateRevision = "SucceedCreateRevision" + // InvalidGangSchedulingAnnotations is emitted when group-gang-scheduling and + // role-instance-gang-scheduling annotations are set simultaneously on the same RBG. + InvalidGangSchedulingAnnotations = "InvalidGangSchedulingAnnotations" ) // rbg-scaling-adapter events diff --git a/internal/controller/workloads/rolebasedgroup_controller.go b/internal/controller/workloads/rolebasedgroup_controller.go index 6c1950ba..a4a80940 100644 --- a/internal/controller/workloads/rolebasedgroup_controller.go +++ b/internal/controller/workloads/rolebasedgroup_controller.go @@ -81,15 +81,17 @@ type RoleBasedGroupReconciler struct { recorder record.EventRecorder workloadReconciler map[string]reconciler.WorkloadReconciler reconcilerMu sync.RWMutex + podGroupManager scheduler.PodGroupManager } -func NewRoleBasedGroupReconciler(mgr ctrl.Manager) *RoleBasedGroupReconciler { +func NewRoleBasedGroupReconciler(mgr ctrl.Manager, schedulerPlugin scheduler.SchedulerPluginType) *RoleBasedGroupReconciler { return &RoleBasedGroupReconciler{ client: mgr.GetClient(), apiReader: mgr.GetAPIReader(), scheme: mgr.GetScheme(), recorder: mgr.GetEventRecorderFor("RoleBasedGroup"), workloadReconciler: make(map[string]reconciler.WorkloadReconciler), + podGroupManager: scheduler.NewPodGroupManager(schedulerPlugin, mgr.GetClient()), } } @@ -170,12 +172,18 @@ func (r *RoleBasedGroupReconciler) Reconcile(ctx context.Context, req ctrl.Reque return ctrl.Result{}, err } - // Step 6: Reconcile roles, do create/update actions for roles. + // Step 6: Reconcile PodGroup for gang scheduling (annotation-driven). + if err := r.reconcilePodGroup(ctx, rbg); err != nil { + r.recorder.Event(rbg, corev1.EventTypeWarning, FailedReconcilePodGroup, err.Error()) + return ctrl.Result{}, err + } + + // Step 7: Reconcile roles, do create/update actions for roles. if err := r.reconcileRoles(ctx, rbg, expectedRolesRevisionHash, scalingTargets, rollingUpdateStrategies); err != nil { return ctrl.Result{}, err } - // Step 7: Cleanup orphaned resources + // Step 8: Cleanup orphaned resources if err := r.cleanup(ctx, rbg); err != nil { return ctrl.Result{}, err } @@ -221,6 +229,23 @@ func (r *RoleBasedGroupReconciler) handleRevisions(ctx context.Context, rbg *wor func (r *RoleBasedGroupReconciler) preCheck(ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup) error { logger := log.FromContext(ctx) + + // Validate that group-gang-scheduling and role-instance-gang-scheduling are not both set + // on the RBG metadata.annotations, as they are mutually exclusive at the RBG level. + if rbg.Annotations[constants.GangSchedulingAnnotationKey] == "true" && + rbg.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey] == "true" { + err := fmt.Errorf( + "annotations %q and %q cannot be set simultaneously on the same RoleBasedGroup; "+ + "use %q for group-level gang scheduling, or set %q per role via role.Annotations", + constants.GangSchedulingAnnotationKey, + constants.RoleInstanceGangSchedulingAnnotationKey, + constants.GangSchedulingAnnotationKey, + constants.RoleInstanceGangSchedulingAnnotationKey, + ) + r.recorder.Event(rbg, corev1.EventTypeWarning, InvalidGangSchedulingAnnotations, err.Error()) + return err + } + // Validate RoleTemplates if err := workloadsv1alpha2.ValidateRoleTemplates(rbg); err != nil { r.recorder.Event(rbg, corev1.EventTypeWarning, InvalidRoleTemplates, err.Error()) @@ -369,6 +394,16 @@ func (r *RoleBasedGroupReconciler) reconcileRefinedDiscoveryConfigMap( return utils.PatchObjectApplyConfiguration(ctx, r.client, cmApplyConfig, utils.PatchSpec) } +func (r *RoleBasedGroupReconciler) reconcilePodGroup( + ctx context.Context, + rbg *workloadsv1alpha2.RoleBasedGroup, +) error { + if r.podGroupManager == nil { + return nil + } + return r.podGroupManager.ReconcilePodGroup(ctx, rbg, runtimeController, &watchedWorkload, r.apiReader) +} + func (r *RoleBasedGroupReconciler) reconcileRoles( ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup, @@ -534,6 +569,11 @@ func (r *RoleBasedGroupReconciler) getOrCreateWorkloadReconciler( return nil, err } + // Inject PodGroupManager if the reconciler supports it (PodGroupManagerSetter). + if setter, ok := rec.(reconciler.PodGroupManagerSetter); ok { + setter.SetPodGroupManager(r.podGroupManager) + } + // Cache the reconciler r.workloadReconciler[workloadType] = rec return rec, nil diff --git a/pkg/constants/annotation.go b/pkg/constants/annotation.go index dba5a2df..50380639 100644 --- a/pkg/constants/annotation.go +++ b/pkg/constants/annotation.go @@ -27,6 +27,27 @@ const ( // DisableExclusiveKeyAnnotationKey can be set to "true" on a Pod template // to skip exclusive-topology affinity injection for that pod. DisableExclusiveKeyAnnotationKey = RBGPrefix + "role-disable-exclusive" + + // GangSchedulingAnnotationKey enables gang scheduling for a RoleBasedGroup when set to "true". + // When enabled, the controller will create a PodGroup CR managed by the scheduler + // configured via --scheduler-name flag (scheduler-plugins or volcano). + // Setting this annotation automatically derives RoleInstanceGangSchedulingAnnotationKey + // for each role's RoleInstanceSet, so they must NOT be set simultaneously. + // Example: rbg.workloads.x-k8s.io/gang-scheduling: "true" + GangSchedulingAnnotationKey = RBGPrefix + "group-gang-scheduling" + + // GangSchedulingScheduleTimeoutSecondsKey specifies the schedule timeout seconds for + // scheduler-plugins based gang scheduling. Defaults to 60 seconds if not set. + // Example: rbg.workloads.x-k8s.io/gang-scheduling-timeout: "120" + GangSchedulingScheduleTimeoutSecondsKey = RBGPrefix + "group-gang-scheduling-timeout" + + // GangSchedulingVolcanoPriorityClassKey specifies the PriorityClassName for volcano gang scheduling. + // Example: rbg.workloads.x-k8s.io/gang-scheduling-volcano-priority: "system-node-critical" + GangSchedulingVolcanoPriorityClassKey = RBGPrefix + "group-gang-scheduling-volcano-priority" + + // GangSchedulingVolcanoQueueKey specifies the Queue for volcano gang scheduling. + // Example: rbg.workloads.x-k8s.io/gang-scheduling-volcano-queue: "default" + GangSchedulingVolcanoQueueKey = RBGPrefix + "group-gang-scheduling-volcano-queue" ) // Role level annotations @@ -44,6 +65,25 @@ const ( // RoleInstancePatternKey identifies the RoleInstance organization pattern (Stateful/Stateless) RoleInstancePatternKey = RBGPrefix + "role-instance-pattern" + // RoleInstanceGangSchedulingAnnotationKey enables gang-scheduling aware behavior at the + // RoleInstance level when set to "true". It is derived automatically from the RBG-level + // GangSchedulingAnnotationKey annotation during RoleInstanceSet reconciliation, but users + // can also set it explicitly in role.Annotations within the RBG spec. + // + // NOTE: This annotation must NOT be set on the RBG object (metadata.annotations) directly + // when GangSchedulingAnnotationKey is already set, as they are mutually exclusive at the + // RBG level. Use either GangSchedulingAnnotationKey (group-level) or set + // RoleInstanceGangSchedulingAnnotationKey per role via role.Annotations, not both. + // + // When enabled, the RoleInstance controller enforces gang-scheduling constraints: + // 1. If any orphan pod (not yet GC'd) exists, pod creation fails immediately instead + // of silently skipping — preventing partial group startup. + // 2. If an in-place update cannot be applied to a pod, all pods of the instance are + // recreated atomically so the PodGroup minimum member requirement is met. + // + // Example: rbg.workloads.x-k8s.io/role-instance-gang-scheduling: "true" + RoleInstanceGangSchedulingAnnotationKey = RBGPrefix + "role-instance-gang-scheduling" + // DiscoveryConfigModeAnnotationKey identifies discovery config handling mode. DiscoveryConfigModeAnnotationKey = RBGPrefix + "discovery-config-mode" ) diff --git a/pkg/reconciler/deploy_reconciler.go b/pkg/reconciler/deploy_reconciler.go index 28c46f5c..bc7feb12 100644 --- a/pkg/reconciler/deploy_reconciler.go +++ b/pkg/reconciler/deploy_reconciler.go @@ -22,12 +22,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/pkg/utils" ) type DeploymentReconciler struct { - scheme *runtime.Scheme - client client.Client + scheme *runtime.Scheme + client client.Client + podGroupManager scheduler.PodGroupManager } var _ WorkloadReconciler = &DeploymentReconciler{} @@ -36,6 +38,11 @@ func NewDeploymentReconciler(scheme *runtime.Scheme, client client.Client) *Depl return &DeploymentReconciler{scheme: scheme, client: client} } +// SetPodGroupManager implements PodGroupManagerSetter. +func (r *DeploymentReconciler) SetPodGroupManager(m scheduler.PodGroupManager) { + r.podGroupManager = m +} + func (r *DeploymentReconciler) Validate( ctx context.Context, role *workloadsv1alpha2.RoleSpec) error { logger := log.FromContext(ctx) @@ -112,6 +119,7 @@ func (r *DeploymentReconciler) constructDeployApplyConfiguration( } podReconciler := NewPodReconciler(r.scheme, r.client) + podReconciler.SetPodGroupManager(r.podGroupManager) podTemplateApplyConfiguration, err := podReconciler.ConstructPodTemplateSpecApplyConfiguration( ctx, rbg, role, maps.Clone(matchLabels), ) diff --git a/pkg/reconciler/lws_reconciler.go b/pkg/reconciler/lws_reconciler.go index e75ff97d..785194e1 100644 --- a/pkg/reconciler/lws_reconciler.go +++ b/pkg/reconciler/lws_reconciler.go @@ -27,12 +27,14 @@ import ( lwsapplyv1 "sigs.k8s.io/lws/client-go/applyconfiguration/leaderworkerset/v1" workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/pkg/utils" ) type LeaderWorkerSetReconciler struct { - scheme *runtime.Scheme - client client.Client + scheme *runtime.Scheme + client client.Client + podGroupManager scheduler.PodGroupManager } var _ WorkloadReconciler = &LeaderWorkerSetReconciler{} @@ -41,6 +43,11 @@ func NewLeaderWorkerSetReconciler(scheme *runtime.Scheme, client client.Client) return &LeaderWorkerSetReconciler{scheme: scheme, client: client} } +// SetPodGroupManager implements PodGroupManagerSetter. +func (r *LeaderWorkerSetReconciler) SetPodGroupManager(m scheduler.PodGroupManager) { + r.podGroupManager = m +} + func (r *LeaderWorkerSetReconciler) Validate( ctx context.Context, role *workloadsv1alpha2.RoleSpec) error { logger := log.FromContext(ctx) @@ -226,6 +233,7 @@ func (r *LeaderWorkerSetReconciler) constructLWSApplyConfiguration( // leaderTemplate podReconciler := NewPodReconciler(r.scheme, r.client) + podReconciler.SetPodGroupManager(r.podGroupManager) // KEP-8: use applyStrategicMergePatch leaderTemp, err := applyStrategicMergePatch(baseTemplate, leaderPatch) if err != nil { @@ -247,6 +255,7 @@ func (r *LeaderWorkerSetReconciler) constructLWSApplyConfiguration( return nil, err } workerPodReconciler := NewPodReconciler(r.scheme, r.client) + workerPodReconciler.SetPodGroupManager(r.podGroupManager) // workerTemplate do not need to inject sidecar workerPodReconciler.SetInjectors([]string{"config", "common_env"}) workerTemplateApplyCfg, err := workerPodReconciler.ConstructPodTemplateSpecApplyConfiguration( diff --git a/pkg/reconciler/pod_reconciler.go b/pkg/reconciler/pod_reconciler.go index 01af84ad..73b3fafd 100644 --- a/pkg/reconciler/pod_reconciler.go +++ b/pkg/reconciler/pod_reconciler.go @@ -17,13 +17,15 @@ import ( workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" "sigs.k8s.io/rbgs/pkg/constants" "sigs.k8s.io/rbgs/pkg/discovery" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/pkg/utils" ) type PodReconciler struct { - scheme *runtime.Scheme - client client.Client - injectObjects []string + scheme *runtime.Scheme + client client.Client + injectObjects []string + podGroupManager scheduler.PodGroupManager } func NewPodReconciler(scheme *runtime.Scheme, client client.Client) *PodReconciler { @@ -37,6 +39,12 @@ func (r *PodReconciler) SetInjectors(injectObjects []string) { r.injectObjects = injectObjects } +// SetPodGroupManager configures the PodGroupManager used to inject gang-scheduling +// labels/annotations into pod templates. +func (r *PodReconciler) SetPodGroupManager(m scheduler.PodGroupManager) { + r.podGroupManager = m +} + func (r *PodReconciler) ConstructPodTemplateSpecApplyConfiguration( ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup, @@ -123,6 +131,11 @@ func (r *PodReconciler) ConstructPodTemplateSpecApplyConfiguration( return nil, err } + // Inject gang-scheduling labels/annotations if a PodGroupManager is configured. + if r.podGroupManager != nil { + r.podGroupManager.InjectPodGroupLabels(rbg, podTemplateApplyConfiguration) + } + podTemplateApplyConfiguration.WithLabels(podLabels).WithAnnotations(podAnnotations) return podTemplateApplyConfiguration, nil } diff --git a/pkg/reconciler/roleinstance/sync/instance_scale.go b/pkg/reconciler/roleinstance/sync/instance_scale.go index c7de7f9a..0a207745 100644 --- a/pkg/reconciler/roleinstance/sync/instance_scale.go +++ b/pkg/reconciler/roleinstance/sync/instance_scale.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/rbgs/pkg/constants" "sigs.k8s.io/rbgs/pkg/utils" workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" @@ -60,6 +61,23 @@ func (c *realControl) calculateDiffsWithExpectation(ctx context.Context, updateI return &expectationDiff{toDeleteNum: len(pods), toDeletePod: pods}, nil } + if isGangSchedulingEnabled(updateInstance) { + for i := range pods { + oldRevision := currentRevision + for _, r := range revisions { + if instanceutil.EqualToRevisionHash("", pods[i], r.Name) { + oldRevision = r + break + } + } + if !c.inplaceControl.CanUpdateInPlace(ctx, oldRevision, updateRevision, coreControl.GetUpdateOptions()) { + c.recorder.Event(updateInstance, v1.EventTypeNormal, "ReCreateInstance", fmt.Sprintf("component %s can't inplace updated, "+ + "recreate all pods of instance: %v", instanceutil.GetPodComponentName(pods[i]), klog.KObj(updateInstance))) + return &expectationDiff{toDeleteNum: len(pods), toDeletePod: pods}, nil + } + } + } + var ( toDeleteNum = 0 toDeletePods []*v1.Pod @@ -105,6 +123,9 @@ func (c *realControl) createPods(ctx context.Context, updateInstance *workloadsv toCreatePodNum := 0 for _, p := range newPods { if c.hasOrphanPod(p.Namespace, p.Name) { + if isGangSchedulingEnabled(updateInstance) { + return false, fmt.Errorf("orphan pod %v has not been gc, fail to create new pod", klog.KObj(p)) + } continue } toCreatePodNum++ @@ -221,3 +242,10 @@ func getExpectedPodCount(instance *workloadsv1alpha2.RoleInstance) int { } return expectedPodCount } + +// isGangSchedulingEnabled reports whether gang-scheduling constraints are active for the +// given RoleInstance. The annotation is derived from the parent RBG's gang-scheduling +// annotation during RoleInstanceSet reconciliation, or set directly via role.Annotations. +func isGangSchedulingEnabled(instance *workloadsv1alpha2.RoleInstance) bool { + return instance.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey] == "true" +} diff --git a/pkg/reconciler/roleinstanceset/statefulmode/stateful_instance_set_utils.go b/pkg/reconciler/roleinstanceset/statefulmode/stateful_instance_set_utils.go index 81b6ae6b..b8d13a0d 100644 --- a/pkg/reconciler/roleinstanceset/statefulmode/stateful_instance_set_utils.go +++ b/pkg/reconciler/roleinstanceset/statefulmode/stateful_instance_set_utils.go @@ -224,6 +224,12 @@ func newVersionedInstance( *metav1.NewControllerRef(currentSet, controllerKind), } + // Propagate gang-scheduling annotation from RoleInstanceSet to RoleInstance so that + // the instance controller can enforce gang-scheduling constraints without access to the RBG. + if v, ok := currentSet.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey]; ok { + instance.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey] = v + } + return instance } diff --git a/pkg/reconciler/roleinstanceset/statelessmode/core/implement.go b/pkg/reconciler/roleinstanceset/statelessmode/core/implement.go index 94238e52..9ffc33eb 100644 --- a/pkg/reconciler/roleinstanceset/statelessmode/core/implement.go +++ b/pkg/reconciler/roleinstanceset/statelessmode/core/implement.go @@ -190,6 +190,15 @@ func GenInstanceFromTemplate(template *appsv1alpha2.RoleInstanceTemplate, set *a instance.OwnerReferences = append(instance.OwnerReferences, *controllerRef) } instance.Spec = *template.RoleInstanceSpec.DeepCopy() + // Propagate gang-scheduling annotation from RoleInstanceSet to RoleInstance so that + // the instance controller can enforce gang-scheduling constraints (e.g. orphan-pod check, + // atomic pod recreation) without access to the parent RBG. + if v, ok := set.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey]; ok { + if instance.Annotations == nil { + instance.Annotations = make(map[string]string) + } + instance.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey] = v + } return instance, nil } diff --git a/pkg/reconciler/roleinstanceset_reconciler.go b/pkg/reconciler/roleinstanceset_reconciler.go index b1694c21..fce286ca 100644 --- a/pkg/reconciler/roleinstanceset_reconciler.go +++ b/pkg/reconciler/roleinstanceset_reconciler.go @@ -16,12 +16,14 @@ import ( workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" workloadsv1alpha2client "sigs.k8s.io/rbgs/client-go/applyconfiguration/workloads/v1alpha2" "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/pkg/utils" ) type RoleInstanceSetReconciler struct { - scheme *runtime.Scheme - client client.Client + scheme *runtime.Scheme + client client.Client + podGroupManager scheduler.PodGroupManager } var _ WorkloadReconciler = &RoleInstanceSetReconciler{} @@ -33,6 +35,11 @@ func NewRoleInstanceSetReconciler(scheme *runtime.Scheme, client client.Client) } } +// SetPodGroupManager implements PodGroupManagerSetter. +func (r *RoleInstanceSetReconciler) SetPodGroupManager(m scheduler.PodGroupManager) { + r.podGroupManager = m +} + func (r *RoleInstanceSetReconciler) Validate( ctx context.Context, role *workloadsv1alpha2.RoleSpec) error { logger := log.FromContext(ctx) @@ -124,6 +131,11 @@ func (r *RoleInstanceSetReconciler) constructRoleInstanceSetApplyConfiguration( if role.Annotations[constants.RoleInstancePatternKey] == "" { roleInstanceSetAnnotation[constants.RoleInstancePatternKey] = string(constants.StatefulPattern) } + // Derive role-instance-level gang scheduling annotation from the RBG-level gang annotation. + // This lets users also set it explicitly per role via role.Annotations. + if rbg.Annotations[constants.GangSchedulingAnnotationKey] == "true" { + roleInstanceSetAnnotation[constants.RoleInstanceGangSchedulingAnnotationKey] = "true" + } // 1. construct role instance configuration var restartPolicy workloadsv1alpha2.RoleInstanceRestartPolicyType @@ -240,6 +252,7 @@ func (r *RoleInstanceSetReconciler) constructRoleInstanceTemplateByCustomCompone roleInstanceTemplateConfig *workloadsv1alpha2client.RoleInstanceTemplateApplyConfiguration, ) error { podReconciler := NewPodReconciler(r.scheme, r.client) + podReconciler.SetPodGroupManager(r.podGroupManager) for _, component := range role.GetCustomComponentsPattern().Components { podTemplateApplyConfiguration, err := podReconciler.ConstructPodTemplateSpecApplyConfiguration( ctx, rbg, role, maps.Clone(matchLabels), component.Template) @@ -286,6 +299,7 @@ func (r *RoleInstanceSetReconciler) constructRoleInstanceTemplateByLeaderWorkerP } leaderPodReconciler := NewPodReconciler(r.scheme, r.client) + leaderPodReconciler.SetPodGroupManager(r.podGroupManager) leaderPodReconciler.SetInjectors([]string{"config", "sidecar", "common_env", "lwp_env"}) leaderTemplateApplyCfg, err := leaderPodReconciler.ConstructPodTemplateSpecApplyConfiguration( ctx, rbg, role, matchLabels, *leaderTemp, @@ -303,6 +317,7 @@ func (r *RoleInstanceSetReconciler) constructRoleInstanceTemplateByLeaderWorkerP } workerPodReconciler := NewPodReconciler(r.scheme, r.client) + workerPodReconciler.SetPodGroupManager(r.podGroupManager) // workerTemplate do not need to inject sidecar workerPodReconciler.SetInjectors([]string{"config", "common_env", "lwp_env"}) workerTemplateApplyCfg, err := workerPodReconciler.ConstructPodTemplateSpecApplyConfiguration( @@ -353,6 +368,7 @@ func (r *RoleInstanceSetReconciler) constructRoleInstanceTemplateFromStandaloneP } podReconciler := NewPodReconciler(r.scheme, r.client) + podReconciler.SetPodGroupManager(r.podGroupManager) podTemplateApplyConfiguration, err := podReconciler.ConstructPodTemplateSpecApplyConfiguration( ctx, rbg, role, maps.Clone(matchLabels), ) diff --git a/pkg/reconciler/sts_reconciler.go b/pkg/reconciler/sts_reconciler.go index faac8715..35435d7c 100644 --- a/pkg/reconciler/sts_reconciler.go +++ b/pkg/reconciler/sts_reconciler.go @@ -24,12 +24,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/pkg/utils" ) type StatefulSetReconciler struct { - scheme *runtime.Scheme - client client.Client + scheme *runtime.Scheme + client client.Client + podGroupManager scheduler.PodGroupManager } var _ WorkloadReconciler = &StatefulSetReconciler{} @@ -41,6 +43,11 @@ func NewStatefulSetReconciler(scheme *runtime.Scheme, client client.Client) *Sta } } +// SetPodGroupManager implements PodGroupManagerSetter. +func (r *StatefulSetReconciler) SetPodGroupManager(m scheduler.PodGroupManager) { + r.podGroupManager = m +} + func (r *StatefulSetReconciler) Validate( ctx context.Context, role *workloadsv1alpha2.RoleSpec) error { logger := log.FromContext(ctx) @@ -439,6 +446,7 @@ func (r *StatefulSetReconciler) constructStatefulSetApplyConfiguration( } podReconciler := NewPodReconciler(r.scheme, r.client) + podReconciler.SetPodGroupManager(r.podGroupManager) podTemplateApplyConfiguration, err := podReconciler.ConstructPodTemplateSpecApplyConfiguration( ctx, rbg, role, maps.Clone(matchLabels), ) diff --git a/pkg/reconciler/workload_reconciler.go b/pkg/reconciler/workload_reconciler.go index 96f7db79..553967b4 100644 --- a/pkg/reconciler/workload_reconciler.go +++ b/pkg/reconciler/workload_reconciler.go @@ -12,6 +12,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/scheduler" ) type WorkloadReconciler interface { @@ -29,6 +30,12 @@ type WorkloadReconciler interface { RecreateWorkload(ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup, role *workloadsv1alpha2.RoleSpec) error } +// PodGroupManagerSetter is an optional interface implemented by WorkloadReconcilers +// that support injecting a PodGroupManager for gang-scheduling label injection. +type PodGroupManagerSetter interface { + SetPodGroupManager(m scheduler.PodGroupManager) +} + func NewWorkloadReconciler( workload workloadsv1alpha2.WorkloadSpec, scheme *runtime.Scheme, client client.Client, ) (WorkloadReconciler, error) { diff --git a/pkg/scheduler/k8s-scheduler-plugin/manager.go b/pkg/scheduler/k8s-scheduler-plugin/manager.go new file mode 100644 index 00000000..7af4c475 --- /dev/null +++ b/pkg/scheduler/k8s-scheduler-plugin/manager.go @@ -0,0 +1,195 @@ +/* +Copyright 2024 The RoleBasedGroup Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package kubeschedulerplugin implements the PodGroupManager interface for +// the Kubernetes scheduler-plugins PodGroup (scheduling.x-k8s.io). +package kubeschedulerplugin + +import ( + "context" + "fmt" + "strconv" + "sync" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" + "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/utils" + schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" +) + +const ( + // CrdName is the CRD name for the kube scheduler-plugins PodGroup. + CrdName = "podgroups.scheduling.x-k8s.io" + + // LabelKey is the pod label key used to associate a pod with a PodGroup. + LabelKey = "pod-group.scheduling.sigs.k8s.io/name" + + defaultScheduleTimeoutSeconds = int32(60) +) + +// PodGroupManager manages kube scheduler-plugins PodGroups for gang scheduling. +type PodGroupManager struct { + client client.Client +} + +// New returns a new PodGroupManager for the kube scheduler plugin. +func New(c client.Client) *PodGroupManager { + return &PodGroupManager{client: c} +} + +// ReconcilePodGroup creates, updates, or deletes the kube PodGroup +// based on the gang-scheduling annotation on the RBG. +func (m *PodGroupManager) ReconcilePodGroup( + ctx context.Context, + rbg *workloadsv1alpha2.RoleBasedGroup, + runtimeController *builder.TypedBuilder[reconcile.Request], + watchedWorkload *sync.Map, + apiReader client.Reader, +) error { + if !isGangSchedulingEnabled(rbg) { + return m.deletePodGroup(ctx, rbg, watchedWorkload) + } + + if _, loaded := watchedWorkload.Load(CrdName); !loaded { + if err := utils.CheckCrdExists(apiReader, CrdName); err != nil { + return fmt.Errorf("scheduling plugin %s not ready", CrdName) + } + watchedWorkload.LoadOrStore(CrdName, struct{}{}) + runtimeController.Owns(&schedv1alpha1.PodGroup{}) + } + + return m.createOrUpdate(ctx, rbg) +} + +// InjectPodGroupLabels injects the kube PodGroup label into the pod template spec. +func (m *PodGroupManager) InjectPodGroupLabels( + rbg *workloadsv1alpha2.RoleBasedGroup, + pts *coreapplyv1.PodTemplateSpecApplyConfiguration, +) { + if isGangSchedulingEnabled(rbg) { + pts.WithLabels(map[string]string{LabelKey: rbg.Name}) + } +} + +func isGangSchedulingEnabled(rbg *workloadsv1alpha2.RoleBasedGroup) bool { + return rbg.Annotations[constants.GangSchedulingAnnotationKey] == "true" +} + +func getScheduleTimeoutSeconds(rbg *workloadsv1alpha2.RoleBasedGroup) *int32 { + if rbg.Annotations != nil { + if v, ok := rbg.Annotations[constants.GangSchedulingScheduleTimeoutSecondsKey]; ok { + if parsed, err := strconv.ParseInt(v, 10, 32); err == nil { + t := int32(parsed) + return &t + } + } + } + t := defaultScheduleTimeoutSeconds + return &t +} + +func (m *PodGroupManager) createOrUpdate(ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup) error { + logger := log.FromContext(ctx) + gvk := utils.GetRbgGVK() + podGroup := &schedv1alpha1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: rbg.Name, + Namespace: rbg.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(rbg, gvk), + }, + }, + Spec: schedv1alpha1.PodGroupSpec{ + MinMember: int32(rbg.GetGroupSize()), + ScheduleTimeoutSeconds: getScheduleTimeoutSeconds(rbg), + }, + } + + err := m.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup) + if err != nil && !apierrors.IsNotFound(err) { + logger.Error(err, "get pod group error") + return err + } + + if apierrors.IsNotFound(err) { + if createErr := m.client.Create(ctx, podGroup); createErr != nil { + logger.Error(createErr, "create pod group error") + return createErr + } + return nil + } + + desiredMinMember := int32(rbg.GetGroupSize()) + desiredTimeout := getScheduleTimeoutSeconds(rbg) + if podGroup.Spec.MinMember != desiredMinMember || + (podGroup.Spec.ScheduleTimeoutSeconds == nil || *podGroup.Spec.ScheduleTimeoutSeconds != *desiredTimeout) { + updateErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if fetchErr := m.client.Get( + ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup, + ); fetchErr != nil { + return fetchErr + } + if !utils.CheckOwnerReference(podGroup.OwnerReferences, gvk) { + podGroup.OwnerReferences = append(podGroup.OwnerReferences, *metav1.NewControllerRef(rbg, gvk)) + } + podGroup.Spec.MinMember = desiredMinMember + podGroup.Spec.ScheduleTimeoutSeconds = desiredTimeout + return m.client.Update(ctx, podGroup) + }) + if updateErr != nil { + logger.Error(updateErr, "update pod group error") + return updateErr + } + } + + return nil +} + +func (m *PodGroupManager) deletePodGroup( + ctx context.Context, + rbg *workloadsv1alpha2.RoleBasedGroup, + watchedWorkload *sync.Map, +) error { + if _, loaded := watchedWorkload.Load(CrdName); !loaded { + return nil + } + + podGroup := &schedv1alpha1.PodGroup{} + err := m.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + if metav1.IsControlledBy(podGroup, rbg) { + if deleteErr := m.client.Delete(ctx, podGroup); deleteErr != nil { + return deleteErr + } + } + + return nil +} diff --git a/pkg/scheduler/podgroup_manager.go b/pkg/scheduler/podgroup_manager.go index 3df00bb4..8968e29b 100644 --- a/pkg/scheduler/podgroup_manager.go +++ b/pkg/scheduler/podgroup_manager.go @@ -2,215 +2,70 @@ package scheduler import ( "context" - "fmt" "sync" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1" - "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" - workloadsv1alpha "sigs.k8s.io/rbgs/api/workloads/v1alpha1" - "sigs.k8s.io/rbgs/pkg/utils" - schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" - volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" + kubeschedulerplugin "sigs.k8s.io/rbgs/pkg/scheduler/k8s-scheduler-plugin" + volcanoplugin "sigs.k8s.io/rbgs/pkg/scheduler/volcano" ) const ( - KubePodGroupLabelKey = "pod-group.scheduling.sigs.k8s.io/name" - VolcanoPodGroupAnnotationKey = "scheduling.k8s.io/group-name" + // KubePodGroupLabelKey is the pod label key used by the kube scheduler-plugins PodGroup. + // Kept here for external consumers (e.g. e2e tests). + KubePodGroupLabelKey = kubeschedulerplugin.LabelKey - // KubePodGroupCrdName is PodGroup CRD Name - KubePodGroupCrdName = "podgroups.scheduling.x-k8s.io" + // VolcanoPodGroupAnnotationKey is the pod annotation key used by Volcano PodGroup. + // Kept here for external consumers (e.g. e2e tests). + VolcanoPodGroupAnnotationKey = volcanoplugin.AnnotationKey - VolcanoPodGroupCrdName = "podgroups.scheduling.volcano.sh" -) - -type PodGroupScheduler struct { - client client.Client -} - -func NewPodGroupScheduler(client client.Client) *PodGroupScheduler { - return &PodGroupScheduler{client: client} -} - -func (r *PodGroupScheduler) Reconcile(ctx context.Context, rbg *workloadsv1alpha.RoleBasedGroup, runtimeController *builder.TypedBuilder[reconcile.Request], watchedWorkload *sync.Map, apiReader client.Reader) error { - // not support change podGroup scheduler - if rbg.IsKubeGangScheduling() { - // check and load kube podGroup CRD - _, podGroupExist := watchedWorkload.Load(KubePodGroupCrdName) - if !podGroupExist { - if err := utils.CheckCrdExists(apiReader, KubePodGroupCrdName); err != nil { - return fmt.Errorf("scheduling plugin %s not ready", KubePodGroupCrdName) - } - watchedWorkload.LoadOrStore(KubePodGroupCrdName, struct{}{}) - runtimeController.Owns(&schedv1alpha1.PodGroup{}) - } - - return r.createOrUpdateKubePodGroup(ctx, rbg) - } else if rbg.IsVolcanoGangScheduling() { - // check and load volcano podGroup CRD - _, podGroupExist := watchedWorkload.Load(VolcanoPodGroupCrdName) - if !podGroupExist { - if err := utils.CheckCrdExists(apiReader, VolcanoPodGroupCrdName); err != nil { - return fmt.Errorf("scheduling plugin %s not ready", VolcanoPodGroupCrdName) - } - watchedWorkload.LoadOrStore(VolcanoPodGroupCrdName, struct{}{}) - runtimeController.Owns(&volcanoschedulingv1beta1.PodGroup{}) - } - - return r.createOrUpdateVolcanoPodGroup(ctx, rbg) - } else { - return r.deletePodGroup(ctx, rbg, watchedWorkload) - } -} - -func InjectPodGroupProtocol(rbg *workloadsv1alpha.RoleBasedGroup, pts *coreapplyv1.PodTemplateSpecApplyConfiguration) { - if rbg.IsKubeGangScheduling() { - pts.WithLabels(map[string]string{KubePodGroupLabelKey: rbg.Name}) - } else if rbg.IsVolcanoGangScheduling() { - pts.WithAnnotations(map[string]string{VolcanoPodGroupAnnotationKey: rbg.Name}) - } -} - -func (r *PodGroupScheduler) createOrUpdateVolcanoPodGroup(ctx context.Context, rbg *workloadsv1alpha.RoleBasedGroup) error { - logger := log.FromContext(ctx) - podGroup := &volcanoschedulingv1beta1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: rbg.Name, - Namespace: rbg.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(rbg, rbg.GroupVersionKind()), - }, - }, - Spec: volcanoschedulingv1beta1.PodGroupSpec{ - MinMember: int32(rbg.GetGroupSize()), - Queue: rbg.Spec.PodGroupPolicy.VolcanoScheduling.Queue, - PriorityClassName: rbg.Spec.PodGroupPolicy.VolcanoScheduling.PriorityClassName, - }, - } - - err := r.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup) - if err != nil && !apierrors.IsNotFound(err) { - logger.Error(err, "get pod group error") - return err - } + // KubePodGroupCrdName is the CRD name for the kube scheduler-plugins PodGroup. + // Kept here for external consumers (e.g. controller SetupWithManager). + KubePodGroupCrdName = kubeschedulerplugin.CrdName - if apierrors.IsNotFound(err) { - err = r.client.Create(ctx, podGroup) - if err != nil { - logger.Error(err, "create pod group error") - } - return err - } - - if podGroup.Spec.MinMember != int32(rbg.GetGroupSize()) || podGroup.Spec.Queue != rbg.Spec.PodGroupPolicy.VolcanoScheduling.Queue || - podGroup.Spec.PriorityClassName != rbg.Spec.PodGroupPolicy.VolcanoScheduling.PriorityClassName { - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - if err := r.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup); err != nil { - return err - } - podGroup.Spec.MinMember = int32(rbg.GetGroupSize()) - podGroup.Spec.Queue = rbg.Spec.PodGroupPolicy.VolcanoScheduling.Queue - podGroup.Spec.PriorityClassName = rbg.Spec.PodGroupPolicy.VolcanoScheduling.PriorityClassName - updateErr := r.client.Update(ctx, podGroup) - return updateErr - }) - if err != nil { - logger.Error(err, "update pod group error") - } - return err - } - - return nil -} - -func (r *PodGroupScheduler) createOrUpdateKubePodGroup(ctx context.Context, rbg *workloadsv1alpha.RoleBasedGroup) error { - logger := log.FromContext(ctx) - gvk := utils.GetRbgGVK() - podGroup := &schedv1alpha1.PodGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: rbg.Name, - Namespace: rbg.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(rbg, rbg.GroupVersionKind()), - }, - }, - Spec: schedv1alpha1.PodGroupSpec{ - MinMember: int32(rbg.GetGroupSize()), - ScheduleTimeoutSeconds: rbg.Spec.PodGroupPolicy.KubeScheduling.ScheduleTimeoutSeconds, - }, - } + // VolcanoPodGroupCrdName is the CRD name for the Volcano PodGroup. + // Kept here for external consumers (e.g. controller SetupWithManager). + VolcanoPodGroupCrdName = volcanoplugin.CrdName +) - err := r.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup) - if err != nil && !apierrors.IsNotFound(err) { - logger.Error(err, "get pod group error") - return err - } +// SchedulerPluginType defines the supported scheduler plugin types. +type SchedulerPluginType string - if apierrors.IsNotFound(err) { - err = r.client.Create(ctx, podGroup) - if err != nil { - logger.Error(err, "create pod group error") - } - return err - } +const ( + // KubeSchedulerPlugin uses the Kubernetes scheduler-plugins PodGroup. + KubeSchedulerPlugin SchedulerPluginType = "scheduler-plugins" - if podGroup.Spec.MinMember != int32(rbg.GetGroupSize()) { - err = retry.RetryOnConflict( - retry.DefaultRetry, func() error { - if err := r.client.Get( - ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup, - ); err != nil { - return err - } - if !utils.CheckOwnerReference(podGroup.OwnerReferences, utils.GetRbgGVK()) { - podGroup.OwnerReferences = append(podGroup.OwnerReferences, *metav1.NewControllerRef(rbg, gvk)) - } - podGroup.Spec.MinMember = int32(rbg.GetGroupSize()) - updateErr := r.client.Update(ctx, podGroup) - return updateErr - }, - ) - if err != nil { - logger.Error(err, "update pod group error") - } - return err - } + // VolcanoSchedulerPlugin uses the Volcano PodGroup. + VolcanoSchedulerPlugin SchedulerPluginType = "volcano" +) - return nil +// PodGroupManager is the interface for managing PodGroups in gang scheduling scenarios. +// Implementations are selected at controller startup based on the --scheduler-name flag. +type PodGroupManager interface { + // ReconcilePodGroup creates, updates, or deletes the PodGroup for the given RBG + // based on the gang-scheduling annotation. + ReconcilePodGroup( + ctx context.Context, + rbg *workloadsv1alpha2.RoleBasedGroup, + runtimeController *builder.TypedBuilder[reconcile.Request], + watchedWorkload *sync.Map, + apiReader client.Reader, + ) error + + // InjectPodGroupLabels injects the scheduler-specific pod labels/annotations + // required for the pod to join the PodGroup. + InjectPodGroupLabels(rbg *workloadsv1alpha2.RoleBasedGroup, pts *coreapplyv1.PodTemplateSpecApplyConfiguration) } -func (r *PodGroupScheduler) deletePodGroup(ctx context.Context, rbg *workloadsv1alpha.RoleBasedGroup, watchedWorkload *sync.Map) error { - if _, podGroupExist := watchedWorkload.Load(KubePodGroupCrdName); podGroupExist { - kubePodGroup := &schedv1alpha1.PodGroup{} - if err := r.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, kubePodGroup); err == nil { - if metav1.IsControlledBy(kubePodGroup, rbg) { - if deleteErr := r.client.Delete(ctx, kubePodGroup); deleteErr != nil { - return deleteErr - } - } - } else if !apierrors.IsNotFound(err) { - return err - } - } - - if _, podGroupExist := watchedWorkload.Load(VolcanoPodGroupCrdName); podGroupExist { - volcanoPodGroup := &volcanoschedulingv1beta1.PodGroup{} - if err := r.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, volcanoPodGroup); err == nil { - if metav1.IsControlledBy(volcanoPodGroup, rbg) { - if deleteErr := r.client.Delete(ctx, volcanoPodGroup); deleteErr != nil { - return deleteErr - } - } - } else if !apierrors.IsNotFound(err) { - return err - } +// NewPodGroupManager returns a PodGroupManager for the given plugin type. +func NewPodGroupManager(plugin SchedulerPluginType, c client.Client) PodGroupManager { + switch plugin { + case VolcanoSchedulerPlugin: + return volcanoplugin.New(c) + default: + return kubeschedulerplugin.New(c) } - - return nil } diff --git a/pkg/scheduler/podgroup_manager_test.go b/pkg/scheduler/podgroup_manager_test.go index d16247d8..07a70b5f 100644 --- a/pkg/scheduler/podgroup_manager_test.go +++ b/pkg/scheduler/podgroup_manager_test.go @@ -17,9 +17,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/reconcile" - workloadsv1alpha "sigs.k8s.io/rbgs/api/workloads/v1alpha1" + workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" + "sigs.k8s.io/rbgs/pkg/constants" "sigs.k8s.io/rbgs/pkg/utils" - wrappers "sigs.k8s.io/rbgs/test/wrappers/v1alpha1" + wrappersv2 "sigs.k8s.io/rbgs/test/wrappers/v1alpha2" schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" ) @@ -27,7 +28,7 @@ import ( func TestPodGroupScheduler_Reconcile(t *testing.T) { // Define test scheme scheme := runtime.NewScheme() - _ = workloadsv1alpha.AddToScheme(scheme) + _ = workloadsv1alpha2.AddToScheme(scheme) _ = schedv1alpha1.AddToScheme(scheme) _ = volcanoschedulingv1beta1.AddToScheme(scheme) _ = apiextensionsv1.AddToScheme(scheme) @@ -62,17 +63,21 @@ func TestPodGroupScheduler_Reconcile(t *testing.T) { tests := []struct { name string client client.Client - rbg *workloadsv1alpha.RoleBasedGroup + rbg *workloadsv1alpha2.RoleBasedGroup + pluginType SchedulerPluginType apiReader client.Reader preFunc func() expectPG bool expectError bool }{ { - name: "create pod group when gang scheduling enabled and pod group not exists", - client: fake.NewClientBuilder().WithScheme(scheme).Build(), - rbg: wrappers.BuildBasicRoleBasedGroup(rbgName, rbgNamespace). - WithKubeGangScheduling(true).Obj(), + name: "create pod group when kube gang scheduling enabled and pod group not exists", + client: fake.NewClientBuilder().WithScheme(scheme).Build(), + pluginType: KubeSchedulerPlugin, + rbg: wrappersv2.BuildBasicRoleBasedGroup(rbgName, rbgNamespace). + WithAnnotations(map[string]string{ + constants.GangSchedulingAnnotationKey: "true", + }).Obj(), apiReader: fake.NewClientBuilder().WithScheme(scheme).WithObjects( &apiextensionsv1.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{Name: KubePodGroupCrdName}, @@ -90,41 +95,15 @@ func TestPodGroupScheduler_Reconcile(t *testing.T) { expectError: false, }, { - name: "create kube pod group with instanceset lws role should use lws size", - client: fake.NewClientBuilder().WithScheme(scheme).Build(), - rbg: func() *workloadsv1alpha.RoleBasedGroup { - role := wrappers.BuildBasicRole("instance-lws-role"). - WithWorkload(workloadsv1alpha.InstanceSetWorkloadType). - WithReplicas(2).Obj() - role.LeaderWorkerSet = &workloadsv1alpha.LeaderWorkerTemplate{ - Size: ptr.To(int32(3)), - } - - return wrappers.BuildBasicRoleBasedGroup(rbgName, rbgNamespace). - WithRoles([]workloadsv1alpha.RoleSpec{role}). - WithKubeGangScheduling(true).Obj() - }(), - apiReader: fake.NewClientBuilder().WithScheme(scheme).WithObjects( - &apiextensionsv1.CustomResourceDefinition{ - ObjectMeta: metav1.ObjectMeta{Name: KubePodGroupCrdName}, - Status: apiextensionsv1.CustomResourceDefinitionStatus{ - Conditions: []apiextensionsv1.CustomResourceDefinitionCondition{ - { - Type: apiextensionsv1.Established, - Status: apiextensionsv1.ConditionTrue, - }, - }, - }, - }, - ).Build(), - expectPG: true, - expectError: false, - }, - { - name: "create pod group when volcano gang scheduling enabled and pod group not exists", - client: fake.NewClientBuilder().WithScheme(scheme).Build(), - rbg: wrappers.BuildBasicRoleBasedGroup(rbgName, rbgNamespace). - WithVolcanoGangScheduling("high-priority", "gpu-queue").Obj(), + name: "create pod group when volcano gang scheduling enabled and pod group not exists", + client: fake.NewClientBuilder().WithScheme(scheme).Build(), + pluginType: VolcanoSchedulerPlugin, + rbg: wrappersv2.BuildBasicRoleBasedGroup(rbgName, rbgNamespace). + WithAnnotations(map[string]string{ + constants.GangSchedulingAnnotationKey: "true", + constants.GangSchedulingVolcanoPriorityClassKey: "high-priority", + constants.GangSchedulingVolcanoQueueKey: "gpu-queue", + }).Obj(), apiReader: fake.NewClientBuilder().WithScheme(scheme).WithObjects( &apiextensionsv1.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{Name: VolcanoPodGroupCrdName}, @@ -142,23 +121,10 @@ func TestPodGroupScheduler_Reconcile(t *testing.T) { expectError: false, }, { - name: "rbg with nil PodGroupPolicy", - client: fake.NewClientBuilder().WithScheme(scheme).Build(), - rbg: &workloadsv1alpha.RoleBasedGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: rbgName, - Namespace: rbgNamespace, - }, - Spec: workloadsv1alpha.RoleBasedGroupSpec{ - PodGroupPolicy: nil, - Roles: []workloadsv1alpha.RoleSpec{ - { - Name: "role1", - Replicas: ptr.To[int32](5), // Updated replica count - }, - }, - }, - }, + name: "gang scheduling disabled (no annotation)", + client: fake.NewClientBuilder().WithScheme(scheme).Build(), + pluginType: KubeSchedulerPlugin, + rbg: wrappersv2.BuildBasicRoleBasedGroup(rbgName, rbgNamespace).Obj(), preFunc: func() { watchedWorkload.LoadOrStore(KubePodGroupCrdName, struct{}{}) runtimeController.Owns(&schedv1alpha1.PodGroup{}) @@ -167,50 +133,36 @@ func TestPodGroupScheduler_Reconcile(t *testing.T) { expectError: false, }, { - name: "rbg with nil KubeScheduling", - client: fake.NewClientBuilder().WithScheme(scheme).Build(), - rbg: &workloadsv1alpha.RoleBasedGroup{ - ObjectMeta: metav1.ObjectMeta{ - Name: rbgName, - Namespace: rbgNamespace, - }, - Spec: workloadsv1alpha.RoleBasedGroupSpec{ - PodGroupPolicy: &workloadsv1alpha.PodGroupPolicy{ - PodGroupPolicySource: workloadsv1alpha.PodGroupPolicySource{ - KubeScheduling: nil, - }, - }, - Roles: []workloadsv1alpha.RoleSpec{ - { - Name: "role1", - Replicas: ptr.To[int32](5), // Updated replica count - }, - }, - }, + name: "delete pod group when gang scheduling disabled and pod group exists", + client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(podGroup).Build(), + pluginType: KubeSchedulerPlugin, + rbg: wrappersv2.BuildBasicRoleBasedGroup(rbgName, rbgNamespace).Obj(), + preFunc: func() { + watchedWorkload.LoadOrStore(KubePodGroupCrdName, struct{}{}) + runtimeController.Owns(&schedv1alpha1.PodGroup{}) }, expectPG: false, expectError: false, }, { - name: "update pod group when gang scheduling enabled and pod group exists with different min member", - client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(podGroup).Build(), - rbg: &workloadsv1alpha.RoleBasedGroup{ + name: "update pod group when kube gang scheduling enabled and min member changed", + client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(podGroup).Build(), + pluginType: KubeSchedulerPlugin, + rbg: &workloadsv1alpha2.RoleBasedGroup{ ObjectMeta: metav1.ObjectMeta{ Name: rbgName, Namespace: rbgNamespace, - }, - Spec: workloadsv1alpha.RoleBasedGroupSpec{ - PodGroupPolicy: &workloadsv1alpha.PodGroupPolicy{ - PodGroupPolicySource: workloadsv1alpha.PodGroupPolicySource{ - KubeScheduling: &workloadsv1alpha.KubeSchedulingPodGroupPolicySource{ - ScheduleTimeoutSeconds: ptr.To(int32(30)), - }, - }, + UID: "rbg-test-uid", + Annotations: map[string]string{ + constants.GangSchedulingAnnotationKey: "true", + constants.GangSchedulingScheduleTimeoutSecondsKey: "30", }, - Roles: []workloadsv1alpha.RoleSpec{ + }, + Spec: workloadsv1alpha2.RoleBasedGroupSpec{ + Roles: []workloadsv1alpha2.RoleSpec{ { Name: "role1", - Replicas: ptr.To[int32](5), // Updated replica count + Replicas: ptr.To[int32](5), }, }, }, @@ -231,53 +183,28 @@ func TestPodGroupScheduler_Reconcile(t *testing.T) { expectPG: true, expectError: false, }, - { - name: "delete pod group when gang scheduling disabled and pod group exists", - client: fake.NewClientBuilder().WithScheme(scheme).WithObjects(podGroup).Build(), - rbg: wrappers.BuildBasicRoleBasedGroup(rbgName, rbgNamespace). - WithKubeGangScheduling(false).Obj(), - preFunc: func() { - watchedWorkload.LoadOrStore(KubePodGroupCrdName, struct{}{}) - runtimeController.Owns(&schedv1alpha1.PodGroup{}) - }, - expectPG: false, - expectError: false, - }, - { - name: "do nothing when gang scheduling disabled and pod group not exists", - client: fake.NewClientBuilder().WithScheme(scheme).Build(), - rbg: wrappers.BuildBasicRoleBasedGroup(rbgName, rbgNamespace). - WithKubeGangScheduling(false).Obj(), - preFunc: func() { - watchedWorkload.LoadOrStore(KubePodGroupCrdName, struct{}{}) - runtimeController.Owns(&schedv1alpha1.PodGroup{}) - }, - expectPG: false, - expectError: false, - }, } for _, tt := range tests { t.Run( tt.name, func(t *testing.T) { - - scheduler := NewPodGroupScheduler(tt.client) + mgr := NewPodGroupManager(tt.pluginType, tt.client) ctx := log.IntoContext(context.TODO(), zap.New().WithValues("env", "test")) if tt.preFunc != nil { tt.preFunc() } - err := scheduler.Reconcile(ctx, tt.rbg, &runtimeController, &watchedWorkload, tt.apiReader) + err := mgr.ReconcilePodGroup(ctx, tt.rbg, &runtimeController, &watchedWorkload, tt.apiReader) // Verify if (err != nil) != tt.expectError { - t.Errorf("PodGroupScheduler.Reconcile() error = %v, expectError %v", err, tt.expectError) + t.Errorf("PodGroupManager.ReconcilePodGroup() error = %v, expectError %v", err, tt.expectError) } // Check if pod group exists or not var obj client.Object - if tt.rbg.IsVolcanoGangScheduling() { + if tt.pluginType == VolcanoSchedulerPlugin { pg := &volcanoschedulingv1beta1.PodGroup{} - err = scheduler.client.Get( + err = tt.client.Get( context.Background(), types.NamespacedName{ Name: tt.rbg.Name, Namespace: tt.rbg.Namespace, @@ -286,7 +213,7 @@ func TestPodGroupScheduler_Reconcile(t *testing.T) { obj = pg } else { pg := &schedv1alpha1.PodGroup{} - err = scheduler.client.Get( + err = tt.client.Get( context.Background(), types.NamespacedName{ Name: tt.rbg.Name, Namespace: tt.rbg.Namespace, diff --git a/pkg/scheduler/volcano/manager.go b/pkg/scheduler/volcano/manager.go new file mode 100644 index 00000000..a8e5f36d --- /dev/null +++ b/pkg/scheduler/volcano/manager.go @@ -0,0 +1,179 @@ +/* +Copyright 2024 The RoleBasedGroup Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package volcano implements the PodGroupManager interface for +// the Volcano PodGroup (scheduling.volcano.sh). +package volcano + +import ( + "context" + "fmt" + "sync" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + coreapplyv1 "k8s.io/client-go/applyconfigurations/core/v1" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" + "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/utils" + volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" +) + +const ( + // CrdName is the CRD name for the Volcano PodGroup. + CrdName = "podgroups.scheduling.volcano.sh" + + // AnnotationKey is the pod annotation key used to associate a pod with a Volcano PodGroup. + AnnotationKey = "scheduling.k8s.io/group-name" +) + +// PodGroupManager manages Volcano PodGroups for gang scheduling. +type PodGroupManager struct { + client client.Client +} + +// New returns a new PodGroupManager for Volcano. +func New(c client.Client) *PodGroupManager { + return &PodGroupManager{client: c} +} + +// ReconcilePodGroup creates, updates, or deletes the Volcano PodGroup +// based on the gang-scheduling annotation on the RBG. +func (m *PodGroupManager) ReconcilePodGroup( + ctx context.Context, + rbg *workloadsv1alpha2.RoleBasedGroup, + runtimeController *builder.TypedBuilder[reconcile.Request], + watchedWorkload *sync.Map, + apiReader client.Reader, +) error { + if !isGangSchedulingEnabled(rbg) { + return m.deletePodGroup(ctx, rbg, watchedWorkload) + } + + if _, loaded := watchedWorkload.Load(CrdName); !loaded { + if err := utils.CheckCrdExists(apiReader, CrdName); err != nil { + return fmt.Errorf("scheduling plugin %s not ready", CrdName) + } + watchedWorkload.LoadOrStore(CrdName, struct{}{}) + runtimeController.Owns(&volcanoschedulingv1beta1.PodGroup{}) + } + + return m.createOrUpdate(ctx, rbg) +} + +// InjectPodGroupLabels injects the Volcano PodGroup annotation into the pod template spec. +func (m *PodGroupManager) InjectPodGroupLabels( + rbg *workloadsv1alpha2.RoleBasedGroup, + pts *coreapplyv1.PodTemplateSpecApplyConfiguration, +) { + if isGangSchedulingEnabled(rbg) { + pts.WithAnnotations(map[string]string{AnnotationKey: rbg.Name}) + } +} + +func isGangSchedulingEnabled(rbg *workloadsv1alpha2.RoleBasedGroup) bool { + return rbg.Annotations[constants.GangSchedulingAnnotationKey] == "true" +} + +func (m *PodGroupManager) createOrUpdate(ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup) error { + logger := log.FromContext(ctx) + queue := rbg.Annotations[constants.GangSchedulingVolcanoQueueKey] + priorityClassName := rbg.Annotations[constants.GangSchedulingVolcanoPriorityClassKey] + + podGroup := &volcanoschedulingv1beta1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: rbg.Name, + Namespace: rbg.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(rbg, utils.GetRbgGVK()), + }, + }, + Spec: volcanoschedulingv1beta1.PodGroupSpec{ + MinMember: int32(rbg.GetGroupSize()), + Queue: queue, + PriorityClassName: priorityClassName, + }, + } + + err := m.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup) + if err != nil && !apierrors.IsNotFound(err) { + logger.Error(err, "get pod group error") + return err + } + + if apierrors.IsNotFound(err) { + if createErr := m.client.Create(ctx, podGroup); createErr != nil { + logger.Error(createErr, "create pod group error") + return createErr + } + return nil + } + + if podGroup.Spec.MinMember != int32(rbg.GetGroupSize()) || + podGroup.Spec.Queue != queue || + podGroup.Spec.PriorityClassName != priorityClassName { + updateErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { + if fetchErr := m.client.Get( + ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup, + ); fetchErr != nil { + return fetchErr + } + podGroup.Spec.MinMember = int32(rbg.GetGroupSize()) + podGroup.Spec.Queue = queue + podGroup.Spec.PriorityClassName = priorityClassName + return m.client.Update(ctx, podGroup) + }) + if updateErr != nil { + logger.Error(updateErr, "update pod group error") + return updateErr + } + } + + return nil +} + +func (m *PodGroupManager) deletePodGroup( + ctx context.Context, + rbg *workloadsv1alpha2.RoleBasedGroup, + watchedWorkload *sync.Map, +) error { + if _, loaded := watchedWorkload.Load(CrdName); !loaded { + return nil + } + + podGroup := &volcanoschedulingv1beta1.PodGroup{} + err := m.client.Get(ctx, types.NamespacedName{Name: rbg.Name, Namespace: rbg.Namespace}, podGroup) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + if metav1.IsControlledBy(podGroup, rbg) { + if deleteErr := m.client.Delete(ctx, podGroup); deleteErr != nil { + return deleteErr + } + } + + return nil +} diff --git a/test/e2e/framework/rbg_v2_expect.go b/test/e2e/framework/rbg_v2_expect.go index a3ecc9ab..00ada844 100644 --- a/test/e2e/framework/rbg_v2_expect.go +++ b/test/e2e/framework/rbg_v2_expect.go @@ -166,6 +166,22 @@ func (f *Framework) ExpectWorkloadV2PodTemplateLabelContains( ).Should(gomega.BeTrue()) } +// ExpectWorkloadV2PodTemplateAnnotationContains checks that the workload's pod template has the given annotations. +func (f *Framework) ExpectWorkloadV2PodTemplateAnnotationContains( + rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, + annotations ...map[string]string, +) { + wlType := workloadTypeFromRoleV2(role) + wlCheck, err := workloads.NewWorkloadEqualCheckerV2(f.Ctx, f.Client, wlType) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + gomega.Eventually( + func() bool { + return wlCheck.ExpectPodTemplateAnnotationContainsV2(rbg, role, annotations...) == nil + }, utils.Timeout, utils.Interval, + ).Should(gomega.BeTrue()) +} + // ExpectWorkloadV2ExclusiveTopology checks that the workload has the correct exclusive topology affinity. func (f *Framework) ExpectWorkloadV2ExclusiveTopology( rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, topologyKey string, diff --git a/test/e2e/framework/workloads/workload_v2_expect.go b/test/e2e/framework/workloads/workload_v2_expect.go index 30f19af1..3adb57f8 100644 --- a/test/e2e/framework/workloads/workload_v2_expect.go +++ b/test/e2e/framework/workloads/workload_v2_expect.go @@ -20,6 +20,7 @@ import ( type WorkloadV2EqualChecker interface { ExpectWorkloadEqualV2(rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec) error ExpectPodTemplateLabelContainsV2(rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, labels ...map[string]string) error + ExpectPodTemplateAnnotationContainsV2(rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, annotations ...map[string]string) error ExpectWorkloadNotExistV2(rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec) error ExpectTopologyAffinityV2(rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, topologyKey string) error } @@ -109,6 +110,33 @@ func (s *RoleInstanceSetCheckerV2) ExpectPodTemplateLabelContainsV2( return nil } +func (s *RoleInstanceSetCheckerV2) ExpectPodTemplateAnnotationContainsV2( + rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, annotationMaps ...map[string]string, +) error { + podList := &corev1.PodList{} + err := s.client.List(s.ctx, podList, + client.InNamespace(rbg.Namespace), + client.MatchingLabels{ + constants.GroupNameLabelKey: rbg.Name, + constants.RoleNameLabelKey: role.Name, + }, + ) + if err != nil { + return fmt.Errorf("list pods for role %s failed: %w", role.Name, err) + } + if len(podList.Items) == 0 { + return fmt.Errorf("no pods found for role %s", role.Name) + } + for _, annotationMap := range annotationMaps { + for key, value := range annotationMap { + if !utils.MapContains(podList.Items[0].Annotations, key, value) { + return fmt.Errorf("pod annotations missing key=%s value=%s", key, value) + } + } + } + return nil +} + func (s *RoleInstanceSetCheckerV2) ExpectWorkloadNotExistV2( rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, ) error { @@ -235,6 +263,12 @@ func (s *LeaderWorkerSetCheckerV2) ExpectPodTemplateLabelContainsV2( return nil } +func (s *LeaderWorkerSetCheckerV2) ExpectPodTemplateAnnotationContainsV2( + rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, annotations ...map[string]string, +) error { + return (&RoleInstanceSetCheckerV2{ctx: s.ctx, client: s.client}).ExpectPodTemplateAnnotationContainsV2(rbg, role, annotations...) +} + func (s *LeaderWorkerSetCheckerV2) ExpectWorkloadNotExistV2( rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, ) error { @@ -286,6 +320,12 @@ func (s *DeploymentCheckerV2) ExpectPodTemplateLabelContainsV2( return (&RoleInstanceSetCheckerV2{ctx: s.ctx, client: s.client}).ExpectPodTemplateLabelContainsV2(rbg, role, labels...) } +func (s *DeploymentCheckerV2) ExpectPodTemplateAnnotationContainsV2( + rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, annotations ...map[string]string, +) error { + return (&RoleInstanceSetCheckerV2{ctx: s.ctx, client: s.client}).ExpectPodTemplateAnnotationContainsV2(rbg, role, annotations...) +} + func (s *DeploymentCheckerV2) ExpectWorkloadNotExistV2( rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, ) error { @@ -323,6 +363,12 @@ func (s *StatefulSetCheckerV2) ExpectPodTemplateLabelContainsV2( return (&RoleInstanceSetCheckerV2{ctx: s.ctx, client: s.client}).ExpectPodTemplateLabelContainsV2(rbg, role, labels...) } +func (s *StatefulSetCheckerV2) ExpectPodTemplateAnnotationContainsV2( + rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, annotations ...map[string]string, +) error { + return (&RoleInstanceSetCheckerV2{ctx: s.ctx, client: s.client}).ExpectPodTemplateAnnotationContainsV2(rbg, role, annotations...) +} + func (s *StatefulSetCheckerV2) ExpectWorkloadNotExistV2( rbg *workloadsv1alpha2.RoleBasedGroup, role workloadsv1alpha2.RoleSpec, ) error { diff --git a/test/e2e/testcase/v1alpha2/rbg.go b/test/e2e/testcase/v1alpha2/rbg.go index 6adafe17..2d21dfc8 100644 --- a/test/e2e/testcase/v1alpha2/rbg.go +++ b/test/e2e/testcase/v1alpha2/rbg.go @@ -6,6 +6,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2" "sigs.k8s.io/rbgs/pkg/constants" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/test/e2e/framework" "sigs.k8s.io/rbgs/test/utils" wrappersv2 "sigs.k8s.io/rbgs/test/wrappers/v1alpha2" @@ -108,6 +109,52 @@ func RunRbgControllerTestCases(f *framework.Framework) { }, ) + ginkgo.It( + "rbg with kube gang scheduling", func() { + rbg := wrappersv2.BuildBasicRoleBasedGroup("e2e-test", f.Namespace). + WithGangScheduling(). + WithRoles( + []workloadsv1alpha2.RoleSpec{ + wrappersv2.BuildStandaloneRole("prefill").Obj(), + wrappersv2.BuildStandaloneRole("decode").Obj(), + }, + ).Obj() + + ginkgo.DeferCleanup(func() { dumpDebugInfo(f, rbg) }) + + gomega.Expect(f.Client.Create(f.Ctx, rbg)).Should(gomega.Succeed()) + + podGroupLabel := map[string]string{ + scheduler.KubePodGroupLabelKey: rbg.Name, + } + f.ExpectWorkloadV2PodTemplateLabelContains(rbg, rbg.Spec.Roles[0], podGroupLabel) + }, + ) + + ginkgo.PIt( + "rbg with volcano gang scheduling", func() { + template := wrappersv2.BuildBasicPodTemplateSpec() + template.Spec.SchedulerName = "volcano" + rbg := wrappersv2.BuildBasicRoleBasedGroup("e2e-test", f.Namespace). + WithVolcanoGangScheduling("default"). + WithRoles( + []workloadsv1alpha2.RoleSpec{ + wrappersv2.BuildStandaloneRole("prefill").WithTemplate(&template).Obj(), + wrappersv2.BuildStandaloneRole("decode").WithTemplate(&template).Obj(), + }, + ).Obj() + + ginkgo.DeferCleanup(func() { dumpDebugInfo(f, rbg) }) + + gomega.Expect(f.Client.Create(f.Ctx, rbg)).Should(gomega.Succeed()) + + podGroupAnnotation := map[string]string{ + scheduler.VolcanoPodGroupAnnotationKey: rbg.Name, + } + f.ExpectWorkloadV2PodTemplateAnnotationContains(rbg, rbg.Spec.Roles[0], podGroupAnnotation) + }, + ) + ginkgo.It( "rbg with exclusive-topology", func() { topologyKey := "kubernetes.io/hostname" diff --git a/test/envtest/testutil/setup.go b/test/envtest/testutil/setup.go index 57cfdad7..5009dbd1 100644 --- a/test/envtest/testutil/setup.go +++ b/test/envtest/testutil/setup.go @@ -40,6 +40,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" workloadsv1alpha1 "sigs.k8s.io/rbgs/api/workloads/v1alpha1" workloadscontroller "sigs.k8s.io/rbgs/internal/controller/workloads" + "sigs.k8s.io/rbgs/pkg/scheduler" "sigs.k8s.io/rbgs/pkg/utils/fieldindex" ) @@ -191,7 +192,7 @@ func SetupManager(ctx context.Context) (manager.Manager, error) { // SetupRBGController sets up the RoleBasedGroup controller func SetupRBGController(mgr manager.Manager) error { - rbgReconciler := workloadscontroller.NewRoleBasedGroupReconciler(mgr) + rbgReconciler := workloadscontroller.NewRoleBasedGroupReconciler(mgr, scheduler.KubeSchedulerPlugin) return rbgReconciler.SetupWithManager(mgr, controller.Options{}) } diff --git a/test/wrappers/v1alpha2/rbg_wrapper.go b/test/wrappers/v1alpha2/rbg_wrapper.go index 83bcfdbb..ad990d12 100644 --- a/test/wrappers/v1alpha2/rbg_wrapper.go +++ b/test/wrappers/v1alpha2/rbg_wrapper.go @@ -6,6 +6,28 @@ import ( "sigs.k8s.io/rbgs/pkg/constants" ) +// WithGangScheduling enables kube scheduler-plugins based gang scheduling +// by setting the GangSchedulingAnnotationKey annotation to "true". +func (rbgWrapper *RoleBasedGroupWrapper) WithGangScheduling() *RoleBasedGroupWrapper { + if rbgWrapper.Annotations == nil { + rbgWrapper.Annotations = make(map[string]string) + } + rbgWrapper.Annotations[constants.GangSchedulingAnnotationKey] = "true" + return rbgWrapper +} + +// WithVolcanoGangScheduling enables Volcano based gang scheduling +// by setting the GangSchedulingAnnotationKey annotation to "true" and +// the GangSchedulingVolcanoQueueKey annotation to the given queue. +func (rbgWrapper *RoleBasedGroupWrapper) WithVolcanoGangScheduling(queue string) *RoleBasedGroupWrapper { + if rbgWrapper.Annotations == nil { + rbgWrapper.Annotations = make(map[string]string) + } + rbgWrapper.Annotations[constants.GangSchedulingAnnotationKey] = "true" + rbgWrapper.Annotations[constants.GangSchedulingVolcanoQueueKey] = queue + return rbgWrapper +} + type RoleBasedGroupWrapper struct { workloadsv1alpha2.RoleBasedGroup }