Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion cmd/rbgs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"
workloadscontroller "sigs.k8s.io/rbgs/internal/controller/workloads"
"sigs.k8s.io/rbgs/pkg/constants"
"sigs.k8s.io/rbgs/pkg/scheduler"
"sigs.k8s.io/rbgs/pkg/utils/fieldindex"
"sigs.k8s.io/rbgs/version"
// +kubebuilder:scaffold:imports
Expand Down Expand Up @@ -101,6 +102,8 @@ func main() {
// Controller runtime options
maxConcurrentReconciles int
cacheSyncTimeout time.Duration
// Gang scheduling scheduler name: scheduler-plugins or volcano
schedulerPlugin string
)
flag.StringVar(
&metricsAddr, "metrics-bind-address", "0", "The address the metrics endpoint binds to. "+
Expand Down Expand Up @@ -135,6 +138,11 @@ func main() {
"The number of worker threads used by the the RBGS controller.",
)
flag.DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 120*time.Second, "Informer cache sync timeout.")
flag.StringVar(
&schedulerPlugin, "scheduler-name", string(scheduler.KubeSchedulerPlugin),
"The scheduler name to use for gang scheduling. Supported values: scheduler-plugins, volcano. "+
"Defaults to scheduler-plugins.",
)

flag.Parse()
opts := zap.Options{
Expand Down Expand Up @@ -282,7 +290,7 @@ func main() {
CacheSyncTimeout: cacheSyncTimeout,
}

rbgReconciler := workloadscontroller.NewRoleBasedGroupReconciler(mgr)
rbgReconciler := workloadscontroller.NewRoleBasedGroupReconciler(mgr, scheduler.SchedulerPluginType(schedulerPlugin))
if err = rbgReconciler.CheckCrdExists(); err != nil {
setupLog.Error(err, "unable to create rbg controller", "controller", "RoleBasedGroup")
os.Exit(1)
Expand Down
1 change: 1 addition & 0 deletions deploy/helm/rbgs/templates/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ spec:
- --metrics-bind-address=:8443
- --leader-elect
- --health-probe-bind-address=:8081
- --scheduler-name={{ .Values.schedulerName | default "scheduler-plugins" }}
command:
- /manager
securityContext:
Expand Down
5 changes: 5 additions & 0 deletions deploy/helm/rbgs/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ nodeSelector: {}

tolerations: []

# Gang scheduling scheduler name configuration.
# Supported values: scheduler-plugins, volcano
# Defaults to scheduler-plugins.
schedulerName: scheduler-plugins

crdUpgrade:
# Whether to enable CRD Upgrader Job (runs before install/upgrade)
enabled: true
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/workloads/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@ const (
Succeed = "Succeed"
FailedUpdateStatus = "FailedUpdateStatus"
FailedCreatePodGroup = "FailedCreatePodGroup"
FailedReconcilePodGroup = "FailedReconcilePodGroup"
FailedCreateRevision = "FailedCreateRevision"
FailedReconcileDiscoveryConfigMap = "FailedReconcileDiscoveryConfigMap"
SucceedCreateRevision = "SucceedCreateRevision"
// InvalidGangSchedulingAnnotations is emitted when group-gang-scheduling and
// role-instance-gang-scheduling annotations are set simultaneously on the same RBG.
InvalidGangSchedulingAnnotations = "InvalidGangSchedulingAnnotations"
)

// rbg-scaling-adapter events
Expand Down
46 changes: 43 additions & 3 deletions internal/controller/workloads/rolebasedgroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ type RoleBasedGroupReconciler struct {
recorder record.EventRecorder
workloadReconciler map[string]reconciler.WorkloadReconciler
reconcilerMu sync.RWMutex
podGroupManager scheduler.PodGroupManager
}

func NewRoleBasedGroupReconciler(mgr ctrl.Manager) *RoleBasedGroupReconciler {
func NewRoleBasedGroupReconciler(mgr ctrl.Manager, schedulerPlugin scheduler.SchedulerPluginType) *RoleBasedGroupReconciler {
return &RoleBasedGroupReconciler{
client: mgr.GetClient(),
apiReader: mgr.GetAPIReader(),
scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("RoleBasedGroup"),
workloadReconciler: make(map[string]reconciler.WorkloadReconciler),
podGroupManager: scheduler.NewPodGroupManager(schedulerPlugin, mgr.GetClient()),
}
}

Expand Down Expand Up @@ -170,12 +172,18 @@ func (r *RoleBasedGroupReconciler) Reconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{}, err
}

// Step 6: Reconcile roles, do create/update actions for roles.
// Step 6: Reconcile PodGroup for gang scheduling (annotation-driven).
if err := r.reconcilePodGroup(ctx, rbg); err != nil {
r.recorder.Event(rbg, corev1.EventTypeWarning, FailedReconcilePodGroup, err.Error())
return ctrl.Result{}, err
}

// Step 7: Reconcile roles, do create/update actions for roles.
if err := r.reconcileRoles(ctx, rbg, expectedRolesRevisionHash, scalingTargets, rollingUpdateStrategies); err != nil {
return ctrl.Result{}, err
}

// Step 7: Cleanup orphaned resources
// Step 8: Cleanup orphaned resources
if err := r.cleanup(ctx, rbg); err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -221,6 +229,23 @@ func (r *RoleBasedGroupReconciler) handleRevisions(ctx context.Context, rbg *wor

func (r *RoleBasedGroupReconciler) preCheck(ctx context.Context, rbg *workloadsv1alpha2.RoleBasedGroup) error {
logger := log.FromContext(ctx)

// Validate that group-gang-scheduling and role-instance-gang-scheduling are not both set
// on the RBG metadata.annotations, as they are mutually exclusive at the RBG level.
if rbg.Annotations[constants.GangSchedulingAnnotationKey] == "true" &&
rbg.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey] == "true" {
err := fmt.Errorf(
"annotations %q and %q cannot be set simultaneously on the same RoleBasedGroup; "+
"use %q for group-level gang scheduling, or set %q per role via role.Annotations",
constants.GangSchedulingAnnotationKey,
constants.RoleInstanceGangSchedulingAnnotationKey,
constants.GangSchedulingAnnotationKey,
constants.RoleInstanceGangSchedulingAnnotationKey,
)
r.recorder.Event(rbg, corev1.EventTypeWarning, InvalidGangSchedulingAnnotations, err.Error())
return err
}

// Validate RoleTemplates
if err := workloadsv1alpha2.ValidateRoleTemplates(rbg); err != nil {
r.recorder.Event(rbg, corev1.EventTypeWarning, InvalidRoleTemplates, err.Error())
Expand Down Expand Up @@ -369,6 +394,16 @@ func (r *RoleBasedGroupReconciler) reconcileRefinedDiscoveryConfigMap(
return utils.PatchObjectApplyConfiguration(ctx, r.client, cmApplyConfig, utils.PatchSpec)
}

func (r *RoleBasedGroupReconciler) reconcilePodGroup(
ctx context.Context,
rbg *workloadsv1alpha2.RoleBasedGroup,
) error {
if r.podGroupManager == nil {
return nil
}
return r.podGroupManager.ReconcilePodGroup(ctx, rbg, runtimeController, &watchedWorkload, r.apiReader)
}

func (r *RoleBasedGroupReconciler) reconcileRoles(
ctx context.Context,
rbg *workloadsv1alpha2.RoleBasedGroup,
Expand Down Expand Up @@ -534,6 +569,11 @@ func (r *RoleBasedGroupReconciler) getOrCreateWorkloadReconciler(
return nil, err
}

// Inject PodGroupManager if the reconciler supports it (PodGroupManagerSetter).
if setter, ok := rec.(reconciler.PodGroupManagerSetter); ok {
setter.SetPodGroupManager(r.podGroupManager)
}

// Cache the reconciler
r.workloadReconciler[workloadType] = rec
return rec, nil
Expand Down
40 changes: 40 additions & 0 deletions pkg/constants/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,27 @@ const (
// DisableExclusiveKeyAnnotationKey can be set to "true" on a Pod template
// to skip exclusive-topology affinity injection for that pod.
DisableExclusiveKeyAnnotationKey = RBGPrefix + "role-disable-exclusive"

// GangSchedulingAnnotationKey enables gang scheduling for a RoleBasedGroup when set to "true".
// When enabled, the controller will create a PodGroup CR managed by the scheduler
// configured via --scheduler-name flag (scheduler-plugins or volcano).
// Setting this annotation automatically derives RoleInstanceGangSchedulingAnnotationKey
// for each role's RoleInstanceSet, so they must NOT be set simultaneously.
// Example: rbg.workloads.x-k8s.io/gang-scheduling: "true"
GangSchedulingAnnotationKey = RBGPrefix + "group-gang-scheduling"

// GangSchedulingScheduleTimeoutSecondsKey specifies the schedule timeout seconds for
// scheduler-plugins based gang scheduling. Defaults to 60 seconds if not set.
// Example: rbg.workloads.x-k8s.io/gang-scheduling-timeout: "120"
GangSchedulingScheduleTimeoutSecondsKey = RBGPrefix + "group-gang-scheduling-timeout"

// GangSchedulingVolcanoPriorityClassKey specifies the PriorityClassName for volcano gang scheduling.
// Example: rbg.workloads.x-k8s.io/gang-scheduling-volcano-priority: "system-node-critical"
GangSchedulingVolcanoPriorityClassKey = RBGPrefix + "group-gang-scheduling-volcano-priority"

// GangSchedulingVolcanoQueueKey specifies the Queue for volcano gang scheduling.
// Example: rbg.workloads.x-k8s.io/gang-scheduling-volcano-queue: "default"
GangSchedulingVolcanoQueueKey = RBGPrefix + "group-gang-scheduling-volcano-queue"
)

// Role level annotations
Expand All @@ -44,6 +65,25 @@ const (
// RoleInstancePatternKey identifies the RoleInstance organization pattern (Stateful/Stateless)
RoleInstancePatternKey = RBGPrefix + "role-instance-pattern"

// RoleInstanceGangSchedulingAnnotationKey enables gang-scheduling aware behavior at the
// RoleInstance level when set to "true". It is derived automatically from the RBG-level
// GangSchedulingAnnotationKey annotation during RoleInstanceSet reconciliation, but users
// can also set it explicitly in role.Annotations within the RBG spec.
//
// NOTE: This annotation must NOT be set on the RBG object (metadata.annotations) directly
// when GangSchedulingAnnotationKey is already set, as they are mutually exclusive at the
// RBG level. Use either GangSchedulingAnnotationKey (group-level) or set
// RoleInstanceGangSchedulingAnnotationKey per role via role.Annotations, not both.
//
// When enabled, the RoleInstance controller enforces gang-scheduling constraints:
// 1. If any orphan pod (not yet GC'd) exists, pod creation fails immediately instead
// of silently skipping — preventing partial group startup.
// 2. If an in-place update cannot be applied to a pod, all pods of the instance are
// recreated atomically so the PodGroup minimum member requirement is met.
//
// Example: rbg.workloads.x-k8s.io/role-instance-gang-scheduling: "true"
RoleInstanceGangSchedulingAnnotationKey = RBGPrefix + "role-instance-gang-scheduling"

// DiscoveryConfigModeAnnotationKey identifies discovery config handling mode.
DiscoveryConfigModeAnnotationKey = RBGPrefix + "discovery-config-mode"
)
Expand Down
12 changes: 10 additions & 2 deletions pkg/reconciler/deploy_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2"
"sigs.k8s.io/rbgs/pkg/constants"
"sigs.k8s.io/rbgs/pkg/scheduler"
"sigs.k8s.io/rbgs/pkg/utils"
)

type DeploymentReconciler struct {
scheme *runtime.Scheme
client client.Client
scheme *runtime.Scheme
client client.Client
podGroupManager scheduler.PodGroupManager
}

var _ WorkloadReconciler = &DeploymentReconciler{}
Expand All @@ -36,6 +38,11 @@ func NewDeploymentReconciler(scheme *runtime.Scheme, client client.Client) *Depl
return &DeploymentReconciler{scheme: scheme, client: client}
}

// SetPodGroupManager implements PodGroupManagerSetter.
func (r *DeploymentReconciler) SetPodGroupManager(m scheduler.PodGroupManager) {
r.podGroupManager = m
}

func (r *DeploymentReconciler) Validate(
ctx context.Context, role *workloadsv1alpha2.RoleSpec) error {
logger := log.FromContext(ctx)
Expand Down Expand Up @@ -112,6 +119,7 @@ func (r *DeploymentReconciler) constructDeployApplyConfiguration(
}

podReconciler := NewPodReconciler(r.scheme, r.client)
podReconciler.SetPodGroupManager(r.podGroupManager)
podTemplateApplyConfiguration, err := podReconciler.ConstructPodTemplateSpecApplyConfiguration(
ctx, rbg, role, maps.Clone(matchLabels),
)
Expand Down
13 changes: 11 additions & 2 deletions pkg/reconciler/lws_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
lwsapplyv1 "sigs.k8s.io/lws/client-go/applyconfiguration/leaderworkerset/v1"
workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2"
"sigs.k8s.io/rbgs/pkg/constants"
"sigs.k8s.io/rbgs/pkg/scheduler"
"sigs.k8s.io/rbgs/pkg/utils"
)

type LeaderWorkerSetReconciler struct {
scheme *runtime.Scheme
client client.Client
scheme *runtime.Scheme
client client.Client
podGroupManager scheduler.PodGroupManager
}

var _ WorkloadReconciler = &LeaderWorkerSetReconciler{}
Expand All @@ -41,6 +43,11 @@ func NewLeaderWorkerSetReconciler(scheme *runtime.Scheme, client client.Client)
return &LeaderWorkerSetReconciler{scheme: scheme, client: client}
}

// SetPodGroupManager implements PodGroupManagerSetter.
func (r *LeaderWorkerSetReconciler) SetPodGroupManager(m scheduler.PodGroupManager) {
r.podGroupManager = m
}

func (r *LeaderWorkerSetReconciler) Validate(
ctx context.Context, role *workloadsv1alpha2.RoleSpec) error {
logger := log.FromContext(ctx)
Expand Down Expand Up @@ -226,6 +233,7 @@ func (r *LeaderWorkerSetReconciler) constructLWSApplyConfiguration(

// leaderTemplate
podReconciler := NewPodReconciler(r.scheme, r.client)
podReconciler.SetPodGroupManager(r.podGroupManager)
// KEP-8: use applyStrategicMergePatch
leaderTemp, err := applyStrategicMergePatch(baseTemplate, leaderPatch)
if err != nil {
Expand All @@ -247,6 +255,7 @@ func (r *LeaderWorkerSetReconciler) constructLWSApplyConfiguration(
return nil, err
}
workerPodReconciler := NewPodReconciler(r.scheme, r.client)
workerPodReconciler.SetPodGroupManager(r.podGroupManager)
// workerTemplate do not need to inject sidecar
workerPodReconciler.SetInjectors([]string{"config", "common_env"})
workerTemplateApplyCfg, err := workerPodReconciler.ConstructPodTemplateSpecApplyConfiguration(
Expand Down
19 changes: 16 additions & 3 deletions pkg/reconciler/pod_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2"
"sigs.k8s.io/rbgs/pkg/constants"
"sigs.k8s.io/rbgs/pkg/discovery"
"sigs.k8s.io/rbgs/pkg/scheduler"
"sigs.k8s.io/rbgs/pkg/utils"
)

type PodReconciler struct {
scheme *runtime.Scheme
client client.Client
injectObjects []string
scheme *runtime.Scheme
client client.Client
injectObjects []string
podGroupManager scheduler.PodGroupManager
}

func NewPodReconciler(scheme *runtime.Scheme, client client.Client) *PodReconciler {
Expand All @@ -37,6 +39,12 @@ func (r *PodReconciler) SetInjectors(injectObjects []string) {
r.injectObjects = injectObjects
}

// SetPodGroupManager configures the PodGroupManager used to inject gang-scheduling
// labels/annotations into pod templates.
func (r *PodReconciler) SetPodGroupManager(m scheduler.PodGroupManager) {
r.podGroupManager = m
}

func (r *PodReconciler) ConstructPodTemplateSpecApplyConfiguration(
ctx context.Context,
rbg *workloadsv1alpha2.RoleBasedGroup,
Expand Down Expand Up @@ -123,6 +131,11 @@ func (r *PodReconciler) ConstructPodTemplateSpecApplyConfiguration(
return nil, err
}

// Inject gang-scheduling labels/annotations if a PodGroupManager is configured.
if r.podGroupManager != nil {
r.podGroupManager.InjectPodGroupLabels(rbg, podTemplateApplyConfiguration)
}

podTemplateApplyConfiguration.WithLabels(podLabels).WithAnnotations(podAnnotations)
return podTemplateApplyConfiguration, nil
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/reconciler/roleinstance/sync/instance_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/rbgs/pkg/constants"
"sigs.k8s.io/rbgs/pkg/utils"

workloadsv1alpha2 "sigs.k8s.io/rbgs/api/workloads/v1alpha2"
Expand Down Expand Up @@ -60,6 +61,23 @@ func (c *realControl) calculateDiffsWithExpectation(ctx context.Context, updateI
return &expectationDiff{toDeleteNum: len(pods), toDeletePod: pods}, nil
}

if isGangSchedulingEnabled(updateInstance) {
for i := range pods {
oldRevision := currentRevision
for _, r := range revisions {
if instanceutil.EqualToRevisionHash("", pods[i], r.Name) {
oldRevision = r
break
}
}
if !c.inplaceControl.CanUpdateInPlace(ctx, oldRevision, updateRevision, coreControl.GetUpdateOptions()) {
c.recorder.Event(updateInstance, v1.EventTypeNormal, "ReCreateInstance", fmt.Sprintf("component %s can't inplace updated, "+
"recreate all pods of instance: %v", instanceutil.GetPodComponentName(pods[i]), klog.KObj(updateInstance)))
return &expectationDiff{toDeleteNum: len(pods), toDeletePod: pods}, nil
}
}
}

var (
toDeleteNum = 0
toDeletePods []*v1.Pod
Expand Down Expand Up @@ -105,6 +123,9 @@ func (c *realControl) createPods(ctx context.Context, updateInstance *workloadsv
toCreatePodNum := 0
for _, p := range newPods {
if c.hasOrphanPod(p.Namespace, p.Name) {
if isGangSchedulingEnabled(updateInstance) {
return false, fmt.Errorf("orphan pod %v has not been gc, fail to create new pod", klog.KObj(p))
}
continue
}
toCreatePodNum++
Expand Down Expand Up @@ -221,3 +242,10 @@ func getExpectedPodCount(instance *workloadsv1alpha2.RoleInstance) int {
}
return expectedPodCount
}

// isGangSchedulingEnabled reports whether gang-scheduling constraints are active for the
// given RoleInstance. The annotation is derived from the parent RBG's gang-scheduling
// annotation during RoleInstanceSet reconciliation, or set directly via role.Annotations.
func isGangSchedulingEnabled(instance *workloadsv1alpha2.RoleInstance) bool {
return instance.Annotations[constants.RoleInstanceGangSchedulingAnnotationKey] == "true"
}
Loading
Loading