diff --git a/api/v1beta2/scheduledsparkapplication_types.go b/api/v1beta2/scheduledsparkapplication_types.go
index 2801cd9247..fd6ee050d5 100644
--- a/api/v1beta2/scheduledsparkapplication_types.go
+++ b/api/v1beta2/scheduledsparkapplication_types.go
@@ -75,6 +75,10 @@ type ScheduledSparkApplicationStatus struct {
ScheduleState ScheduleState `json:"scheduleState,omitempty"`
// Reason tells why the ScheduledSparkApplication is in the particular ScheduleState.
Reason string `json:"reason,omitempty"`
+ // ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
+ // It is used to detect spec changes and trigger re-evaluation of the schedule.
+ // +optional
+ ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}
// +kubebuilder:object:root=true
diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
index 7bc36f2e85..7880050bdb 100644
--- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
+++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
@@ -12442,6 +12442,12 @@ spec:
format: date-time
nullable: true
type: string
+ observedGeneration:
+ description: |-
+ ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
+ It is used to detect spec changes and trigger re-evaluation of the schedule.
+ format: int64
+ type: integer
pastFailedRunNames:
description: PastFailedRunNames keeps the names of SparkApplications
for past failed runs.
diff --git a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml
index 7bc36f2e85..7880050bdb 100644
--- a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml
+++ b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml
@@ -12442,6 +12442,12 @@ spec:
format: date-time
nullable: true
type: string
+ observedGeneration:
+ description: |-
+ ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
+ It is used to detect spec changes and trigger re-evaluation of the schedule.
+ format: int64
+ type: integer
pastFailedRunNames:
description: PastFailedRunNames keeps the names of SparkApplications
for past failed runs.
diff --git a/docs/api-docs.md b/docs/api-docs.md
index 4cb58ed21a..e06c15154b 100644
--- a/docs/api-docs.md
+++ b/docs/api-docs.md
@@ -2254,6 +2254,19 @@ string
Reason tells why the ScheduledSparkApplication is in the particular ScheduleState.
+
+
+observedGeneration
+
+int64
+
+ |
+
+(Optional)
+ ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
+It is used to detect spec changes and trigger re-evaluation of the schedule.
+ |
+
SecretInfo
diff --git a/internal/controller/scheduledsparkapplication/controller.go b/internal/controller/scheduledsparkapplication/controller.go
index 733205450c..6f91b5dfb5 100644
--- a/internal/controller/scheduledsparkapplication/controller.go
+++ b/internal/controller/scheduledsparkapplication/controller.go
@@ -142,6 +142,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
logger.Error(err, "Failed to load timezone location", "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, "timezone", timezone)
scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateFailedValidation
scheduledApp.Status.Reason = fmt.Sprintf("Invalid timezone: %v", err)
+ scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
if updateErr := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); updateErr != nil {
return ctrl.Result{Requeue: true}, updateErr
}
@@ -160,12 +161,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
logger.Error(parseErr, "Failed to parse schedule of ScheduledSparkApplication", "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, "schedule", scheduledApp.Spec.Schedule)
scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateFailedValidation
scheduledApp.Status.Reason = parseErr.Error()
+ scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
if updateErr := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); updateErr != nil {
return ctrl.Result{Requeue: true}, updateErr
}
return ctrl.Result{}, nil
}
+ // Detect spec changes and reset state to trigger re-evaluation.
+ specChanged := scheduledApp.Generation != scheduledApp.Status.ObservedGeneration
+ if specChanged {
+ switch scheduledApp.Status.ScheduleState {
+ case v1beta2.ScheduleStateFailedValidation:
+ // Reset to New so the updated spec gets re-validated.
+ logger.Info("Spec changed while in FailedValidation state, resetting to New",
+ "name", scheduledApp.Name, "namespace", scheduledApp.Namespace,
+ "generation", scheduledApp.Generation, "observedGeneration", scheduledApp.Status.ObservedGeneration)
+ scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateNew
+ scheduledApp.Status.Reason = ""
+ scheduledApp.Status.NextRun = metav1.Time{}
+ }
+ }
+
switch scheduledApp.Status.ScheduleState {
case v1beta2.ScheduleStateNew:
now := r.clock.Now()
@@ -175,12 +192,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
scheduledApp.Status.NextRun = metav1.NewTime(nextRunTime)
}
scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateScheduled
+ scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
if err := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); err != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{RequeueAfter: nextRunTime.Sub(now)}, err
case v1beta2.ScheduleStateScheduled:
now := r.clock.Now()
+
+ // Recalculate nextRun if the spec has changed.
+ if specChanged {
+ logger.Info("Spec changed, recalculating nextRun",
+ "name", scheduledApp.Name, "namespace", scheduledApp.Namespace,
+ "generation", scheduledApp.Generation, "observedGeneration", scheduledApp.Status.ObservedGeneration)
+ nextRunTime := schedule.Next(now)
+ scheduledApp.Status.NextRun = metav1.NewTime(nextRunTime)
+ scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
+ if err := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); err != nil {
+ return ctrl.Result{Requeue: true}, err
+ }
+ return ctrl.Result{RequeueAfter: nextRunTime.Sub(now)}, nil
+ }
+
nextRunTime := scheduledApp.Status.NextRun
if nextRunTime.IsZero() {
scheduledApp.Status.NextRun = metav1.NewTime(schedule.Next(now))
diff --git a/internal/controller/scheduledsparkapplication/controller_test.go b/internal/controller/scheduledsparkapplication/controller_test.go
index c0c27834ff..a60757a263 100644
--- a/internal/controller/scheduledsparkapplication/controller_test.go
+++ b/internal/controller/scheduledsparkapplication/controller_test.go
@@ -93,6 +93,140 @@ var _ = Describe("ScheduledSparkApplication Controller", func() {
})
})
+var _ = Describe("ScheduledSparkApplication spec change detection", func() {
+ Context("when spec.schedule changes in ScheduleStateScheduled", func() {
+ const resourceName = "repro-schedule-change"
+ ctx := context.Background()
+ typeNamespacedName := types.NamespacedName{
+ Name: resourceName,
+ Namespace: "default",
+ }
+
+ BeforeEach(func() {
+ resource := &v1beta2.ScheduledSparkApplication{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: resourceName,
+ Namespace: "default",
+ },
+ Spec: v1beta2.ScheduledSparkApplicationSpec{
+ Schedule: "10 * * * *",
+ ConcurrencyPolicy: v1beta2.ConcurrencyAllow,
+ Template: v1beta2.SparkApplicationSpec{
+ Type: v1beta2.SparkApplicationTypeScala,
+ Mode: v1beta2.DeployModeCluster,
+ RestartPolicy: v1beta2.RestartPolicy{
+ Type: v1beta2.RestartPolicyNever,
+ },
+ MainApplicationFile: ptr.To("local:///dummy.jar"),
+ },
+ },
+ }
+ Expect(k8sClient.Create(ctx, resource)).To(Succeed())
+ })
+
+ AfterEach(func() {
+ resource := &v1beta2.ScheduledSparkApplication{}
+ err := k8sClient.Get(ctx, typeNamespacedName, resource)
+ if err == nil {
+ Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
+ }
+ })
+
+ It("should recalculate nextRun when spec.schedule changes", func() {
+ reconciler := NewReconciler(k8sClient.Scheme(), k8sClient, nil, clock.RealClock{}, Options{Namespaces: []string{"default"}, TimestampPrecision: "nanos"})
+
+ By("Reconciling to reach ScheduleStateScheduled")
+ _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
+ Expect(err).NotTo(HaveOccurred())
+
+ // Verify it's in Scheduled state
+ app := &v1beta2.ScheduledSparkApplication{}
+ Expect(k8sClient.Get(ctx, typeNamespacedName, app)).To(Succeed())
+ Expect(app.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateScheduled))
+ oldNextRun := app.Status.NextRun
+
+ By("Changing spec.schedule from '10 * * * *' to '50 * * * *'")
+ app.Spec.Schedule = "50 * * * *"
+ Expect(k8sClient.Update(ctx, app)).To(Succeed())
+
+ By("Reconciling again after schedule change")
+ _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
+ Expect(err).NotTo(HaveOccurred())
+
+ updatedApp := &v1beta2.ScheduledSparkApplication{}
+ Expect(k8sClient.Get(ctx, typeNamespacedName, updatedApp)).To(Succeed())
+ Expect(updatedApp.Status.NextRun.Time).NotTo(Equal(oldNextRun.Time),
+ "nextRun should be recalculated after spec.schedule change")
+ })
+ })
+
+ Context("when spec is fixed after FailedValidation", func() {
+ const resourceName = "repro-failed-validation"
+ ctx := context.Background()
+ typeNamespacedName := types.NamespacedName{
+ Name: resourceName,
+ Namespace: "default",
+ }
+
+ BeforeEach(func() {
+ resource := &v1beta2.ScheduledSparkApplication{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: resourceName,
+ Namespace: "default",
+ },
+ Spec: v1beta2.ScheduledSparkApplicationSpec{
+ Schedule: "invalid-cron",
+ ConcurrencyPolicy: v1beta2.ConcurrencyAllow,
+ Template: v1beta2.SparkApplicationSpec{
+ Type: v1beta2.SparkApplicationTypeScala,
+ Mode: v1beta2.DeployModeCluster,
+ RestartPolicy: v1beta2.RestartPolicy{
+ Type: v1beta2.RestartPolicyNever,
+ },
+ MainApplicationFile: ptr.To("local:///dummy.jar"),
+ },
+ },
+ }
+ Expect(k8sClient.Create(ctx, resource)).To(Succeed())
+ })
+
+ AfterEach(func() {
+ resource := &v1beta2.ScheduledSparkApplication{}
+ err := k8sClient.Get(ctx, typeNamespacedName, resource)
+ if err == nil {
+ Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
+ }
+ })
+
+ It("should recover from FailedValidation when spec is fixed", func() {
+ reconciler := NewReconciler(k8sClient.Scheme(), k8sClient, nil, clock.RealClock{}, Options{Namespaces: []string{"default"}, TimestampPrecision: "nanos"})
+
+ By("Reconciling to reach ScheduleStateFailedValidation")
+ _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
+ Expect(err).NotTo(HaveOccurred())
+
+ app := &v1beta2.ScheduledSparkApplication{}
+ Expect(k8sClient.Get(ctx, typeNamespacedName, app)).To(Succeed())
+ Expect(app.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateFailedValidation))
+
+ By("Fixing spec.schedule to a valid cron expression")
+ app.Spec.Schedule = "10 * * * *"
+ Expect(k8sClient.Update(ctx, app)).To(Succeed())
+
+ By("Reconciling again after fixing the schedule")
+ _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
+ Expect(err).NotTo(HaveOccurred())
+
+ updatedApp := &v1beta2.ScheduledSparkApplication{}
+ Expect(k8sClient.Get(ctx, typeNamespacedName, updatedApp)).To(Succeed())
+ Expect(updatedApp.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateScheduled),
+ "should recover from FailedValidation to Scheduled after spec is fixed")
+ Expect(updatedApp.Status.NextRun.IsZero()).To(BeFalse(),
+ "nextRun should be set after recovering from FailedValidation")
+ })
+ })
+})
+
var _ = Describe("formatTimestamp", func() {
var testTime time.Time