diff --git a/api/v1beta2/scheduledsparkapplication_types.go b/api/v1beta2/scheduledsparkapplication_types.go index 2801cd9247..fd6ee050d5 100644 --- a/api/v1beta2/scheduledsparkapplication_types.go +++ b/api/v1beta2/scheduledsparkapplication_types.go @@ -75,6 +75,10 @@ type ScheduledSparkApplicationStatus struct { ScheduleState ScheduleState `json:"scheduleState,omitempty"` // Reason tells why the ScheduledSparkApplication is in the particular ScheduleState. Reason string `json:"reason,omitempty"` + // ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication. + // It is used to detect spec changes and trigger re-evaluation of the schedule. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` } // +kubebuilder:object:root=true diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 7bc36f2e85..7880050bdb 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -12442,6 +12442,12 @@ spec: format: date-time nullable: true type: string + observedGeneration: + description: |- + ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication. + It is used to detect spec changes and trigger re-evaluation of the schedule. + format: int64 + type: integer pastFailedRunNames: description: PastFailedRunNames keeps the names of SparkApplications for past failed runs. diff --git a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 7bc36f2e85..7880050bdb 100644 --- a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -12442,6 +12442,12 @@ spec: format: date-time nullable: true type: string + observedGeneration: + description: |- + ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication. + It is used to detect spec changes and trigger re-evaluation of the schedule. + format: int64 + type: integer pastFailedRunNames: description: PastFailedRunNames keeps the names of SparkApplications for past failed runs. diff --git a/docs/api-docs.md b/docs/api-docs.md index 4cb58ed21a..e06c15154b 100644 --- a/docs/api-docs.md +++ b/docs/api-docs.md @@ -2254,6 +2254,19 @@ string

Reason tells why the ScheduledSparkApplication is in the particular ScheduleState.

+ + +observedGeneration
+ +int64 + + + +(Optional) +

ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication. +It is used to detect spec changes and trigger re-evaluation of the schedule.

+ +

SecretInfo diff --git a/internal/controller/scheduledsparkapplication/controller.go b/internal/controller/scheduledsparkapplication/controller.go index 733205450c..6f91b5dfb5 100644 --- a/internal/controller/scheduledsparkapplication/controller.go +++ b/internal/controller/scheduledsparkapplication/controller.go @@ -142,6 +142,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu logger.Error(err, "Failed to load timezone location", "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, "timezone", timezone) scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateFailedValidation scheduledApp.Status.Reason = fmt.Sprintf("Invalid timezone: %v", err) + scheduledApp.Status.ObservedGeneration = scheduledApp.Generation if updateErr := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); updateErr != nil { return ctrl.Result{Requeue: true}, updateErr } @@ -160,12 +161,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu logger.Error(parseErr, "Failed to parse schedule of ScheduledSparkApplication", "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, "schedule", scheduledApp.Spec.Schedule) scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateFailedValidation scheduledApp.Status.Reason = parseErr.Error() + scheduledApp.Status.ObservedGeneration = scheduledApp.Generation if updateErr := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); updateErr != nil { return ctrl.Result{Requeue: true}, updateErr } return ctrl.Result{}, nil } + // Detect spec changes and reset state to trigger re-evaluation. + specChanged := scheduledApp.Generation != scheduledApp.Status.ObservedGeneration + if specChanged { + switch scheduledApp.Status.ScheduleState { + case v1beta2.ScheduleStateFailedValidation: + // Reset to New so the updated spec gets re-validated. + logger.Info("Spec changed while in FailedValidation state, resetting to New", + "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, + "generation", scheduledApp.Generation, "observedGeneration", scheduledApp.Status.ObservedGeneration) + scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateNew + scheduledApp.Status.Reason = "" + scheduledApp.Status.NextRun = metav1.Time{} + } + } + switch scheduledApp.Status.ScheduleState { case v1beta2.ScheduleStateNew: now := r.clock.Now() @@ -175,12 +192,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu scheduledApp.Status.NextRun = metav1.NewTime(nextRunTime) } scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateScheduled + scheduledApp.Status.ObservedGeneration = scheduledApp.Generation if err := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); err != nil { return ctrl.Result{Requeue: true}, err } return ctrl.Result{RequeueAfter: nextRunTime.Sub(now)}, err case v1beta2.ScheduleStateScheduled: now := r.clock.Now() + + // Recalculate nextRun if the spec has changed. + if specChanged { + logger.Info("Spec changed, recalculating nextRun", + "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, + "generation", scheduledApp.Generation, "observedGeneration", scheduledApp.Status.ObservedGeneration) + nextRunTime := schedule.Next(now) + scheduledApp.Status.NextRun = metav1.NewTime(nextRunTime) + scheduledApp.Status.ObservedGeneration = scheduledApp.Generation + if err := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); err != nil { + return ctrl.Result{Requeue: true}, err + } + return ctrl.Result{RequeueAfter: nextRunTime.Sub(now)}, nil + } + nextRunTime := scheduledApp.Status.NextRun if nextRunTime.IsZero() { scheduledApp.Status.NextRun = metav1.NewTime(schedule.Next(now)) diff --git a/internal/controller/scheduledsparkapplication/controller_test.go b/internal/controller/scheduledsparkapplication/controller_test.go index c0c27834ff..a60757a263 100644 --- a/internal/controller/scheduledsparkapplication/controller_test.go +++ b/internal/controller/scheduledsparkapplication/controller_test.go @@ -93,6 +93,140 @@ var _ = Describe("ScheduledSparkApplication Controller", func() { }) }) +var _ = Describe("ScheduledSparkApplication spec change detection", func() { + Context("when spec.schedule changes in ScheduleStateScheduled", func() { + const resourceName = "repro-schedule-change" + ctx := context.Background() + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + + BeforeEach(func() { + resource := &v1beta2.ScheduledSparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: "default", + }, + Spec: v1beta2.ScheduledSparkApplicationSpec{ + Schedule: "10 * * * *", + ConcurrencyPolicy: v1beta2.ConcurrencyAllow, + Template: v1beta2.SparkApplicationSpec{ + Type: v1beta2.SparkApplicationTypeScala, + Mode: v1beta2.DeployModeCluster, + RestartPolicy: v1beta2.RestartPolicy{ + Type: v1beta2.RestartPolicyNever, + }, + MainApplicationFile: ptr.To("local:///dummy.jar"), + }, + }, + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + + AfterEach(func() { + resource := &v1beta2.ScheduledSparkApplication{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + if err == nil { + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + } + }) + + It("should recalculate nextRun when spec.schedule changes", func() { + reconciler := NewReconciler(k8sClient.Scheme(), k8sClient, nil, clock.RealClock{}, Options{Namespaces: []string{"default"}, TimestampPrecision: "nanos"}) + + By("Reconciling to reach ScheduleStateScheduled") + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + // Verify it's in Scheduled state + app := &v1beta2.ScheduledSparkApplication{} + Expect(k8sClient.Get(ctx, typeNamespacedName, app)).To(Succeed()) + Expect(app.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateScheduled)) + oldNextRun := app.Status.NextRun + + By("Changing spec.schedule from '10 * * * *' to '50 * * * *'") + app.Spec.Schedule = "50 * * * *" + Expect(k8sClient.Update(ctx, app)).To(Succeed()) + + By("Reconciling again after schedule change") + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + updatedApp := &v1beta2.ScheduledSparkApplication{} + Expect(k8sClient.Get(ctx, typeNamespacedName, updatedApp)).To(Succeed()) + Expect(updatedApp.Status.NextRun.Time).NotTo(Equal(oldNextRun.Time), + "nextRun should be recalculated after spec.schedule change") + }) + }) + + Context("when spec is fixed after FailedValidation", func() { + const resourceName = "repro-failed-validation" + ctx := context.Background() + typeNamespacedName := types.NamespacedName{ + Name: resourceName, + Namespace: "default", + } + + BeforeEach(func() { + resource := &v1beta2.ScheduledSparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: "default", + }, + Spec: v1beta2.ScheduledSparkApplicationSpec{ + Schedule: "invalid-cron", + ConcurrencyPolicy: v1beta2.ConcurrencyAllow, + Template: v1beta2.SparkApplicationSpec{ + Type: v1beta2.SparkApplicationTypeScala, + Mode: v1beta2.DeployModeCluster, + RestartPolicy: v1beta2.RestartPolicy{ + Type: v1beta2.RestartPolicyNever, + }, + MainApplicationFile: ptr.To("local:///dummy.jar"), + }, + }, + } + Expect(k8sClient.Create(ctx, resource)).To(Succeed()) + }) + + AfterEach(func() { + resource := &v1beta2.ScheduledSparkApplication{} + err := k8sClient.Get(ctx, typeNamespacedName, resource) + if err == nil { + Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + } + }) + + It("should recover from FailedValidation when spec is fixed", func() { + reconciler := NewReconciler(k8sClient.Scheme(), k8sClient, nil, clock.RealClock{}, Options{Namespaces: []string{"default"}, TimestampPrecision: "nanos"}) + + By("Reconciling to reach ScheduleStateFailedValidation") + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + app := &v1beta2.ScheduledSparkApplication{} + Expect(k8sClient.Get(ctx, typeNamespacedName, app)).To(Succeed()) + Expect(app.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateFailedValidation)) + + By("Fixing spec.schedule to a valid cron expression") + app.Spec.Schedule = "10 * * * *" + Expect(k8sClient.Update(ctx, app)).To(Succeed()) + + By("Reconciling again after fixing the schedule") + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName}) + Expect(err).NotTo(HaveOccurred()) + + updatedApp := &v1beta2.ScheduledSparkApplication{} + Expect(k8sClient.Get(ctx, typeNamespacedName, updatedApp)).To(Succeed()) + Expect(updatedApp.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateScheduled), + "should recover from FailedValidation to Scheduled after spec is fixed") + Expect(updatedApp.Status.NextRun.IsZero()).To(BeFalse(), + "nextRun should be set after recovering from FailedValidation") + }) + }) +}) + var _ = Describe("formatTimestamp", func() { var testTime time.Time