Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/v1beta2/scheduledsparkapplication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ type ScheduledSparkApplicationStatus struct {
ScheduleState ScheduleState `json:"scheduleState,omitempty"`
// Reason tells why the ScheduledSparkApplication is in the particular ScheduleState.
Reason string `json:"reason,omitempty"`
// ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
// It is used to detect spec changes and trigger re-evaluation of the schedule.
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12442,6 +12442,12 @@ spec:
format: date-time
nullable: true
type: string
observedGeneration:
description: |-
ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
It is used to detect spec changes and trigger re-evaluation of the schedule.
format: int64
type: integer
pastFailedRunNames:
description: PastFailedRunNames keeps the names of SparkApplications
for past failed runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12442,6 +12442,12 @@ spec:
format: date-time
nullable: true
type: string
observedGeneration:
description: |-
ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
It is used to detect spec changes and trigger re-evaluation of the schedule.
format: int64
type: integer
pastFailedRunNames:
description: PastFailedRunNames keeps the names of SparkApplications
for past failed runs.
Expand Down
13 changes: 13 additions & 0 deletions docs/api-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,19 @@ string
<p>Reason tells why the ScheduledSparkApplication is in the particular ScheduleState.</p>
</td>
</tr>
<tr>
<td>
<code>observedGeneration</code><br/>
<em>
int64
</em>
</td>
<td>
<em>(Optional)</em>
<p>ObservedGeneration is the most recent generation observed for this ScheduledSparkApplication.
It is used to detect spec changes and trigger re-evaluation of the schedule.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="sparkoperator.k8s.io/v1beta2.SecretInfo">SecretInfo
Expand Down
33 changes: 33 additions & 0 deletions internal/controller/scheduledsparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
logger.Error(err, "Failed to load timezone location", "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, "timezone", timezone)
scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateFailedValidation
scheduledApp.Status.Reason = fmt.Sprintf("Invalid timezone: %v", err)
scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
if updateErr := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); updateErr != nil {
return ctrl.Result{Requeue: true}, updateErr
}
Expand All @@ -160,12 +161,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
logger.Error(parseErr, "Failed to parse schedule of ScheduledSparkApplication", "name", scheduledApp.Name, "namespace", scheduledApp.Namespace, "schedule", scheduledApp.Spec.Schedule)
scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateFailedValidation
scheduledApp.Status.Reason = parseErr.Error()
scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
if updateErr := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); updateErr != nil {
return ctrl.Result{Requeue: true}, updateErr
}
return ctrl.Result{}, nil
}

// Detect spec changes and reset state to trigger re-evaluation.
specChanged := scheduledApp.Generation != scheduledApp.Status.ObservedGeneration
if specChanged {
switch scheduledApp.Status.ScheduleState {
case v1beta2.ScheduleStateFailedValidation:
// Reset to New so the updated spec gets re-validated.
logger.Info("Spec changed while in FailedValidation state, resetting to New",
"name", scheduledApp.Name, "namespace", scheduledApp.Namespace,
"generation", scheduledApp.Generation, "observedGeneration", scheduledApp.Status.ObservedGeneration)
scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateNew
scheduledApp.Status.Reason = ""
scheduledApp.Status.NextRun = metav1.Time{}
}
}

switch scheduledApp.Status.ScheduleState {
case v1beta2.ScheduleStateNew:
now := r.clock.Now()
Expand All @@ -175,12 +192,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
scheduledApp.Status.NextRun = metav1.NewTime(nextRunTime)
}
scheduledApp.Status.ScheduleState = v1beta2.ScheduleStateScheduled
scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
if err := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); err != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{RequeueAfter: nextRunTime.Sub(now)}, err
case v1beta2.ScheduleStateScheduled:
now := r.clock.Now()

// Recalculate nextRun if the spec has changed.
if specChanged {
logger.Info("Spec changed, recalculating nextRun",
"name", scheduledApp.Name, "namespace", scheduledApp.Namespace,
"generation", scheduledApp.Generation, "observedGeneration", scheduledApp.Status.ObservedGeneration)
nextRunTime := schedule.Next(now)
scheduledApp.Status.NextRun = metav1.NewTime(nextRunTime)
scheduledApp.Status.ObservedGeneration = scheduledApp.Generation
if err := r.updateScheduledSparkApplicationStatus(ctx, scheduledApp); err != nil {
return ctrl.Result{Requeue: true}, err
}
return ctrl.Result{RequeueAfter: nextRunTime.Sub(now)}, nil
}

nextRunTime := scheduledApp.Status.NextRun
if nextRunTime.IsZero() {
scheduledApp.Status.NextRun = metav1.NewTime(schedule.Next(now))
Expand Down
134 changes: 134 additions & 0 deletions internal/controller/scheduledsparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,140 @@ var _ = Describe("ScheduledSparkApplication Controller", func() {
})
})

var _ = Describe("ScheduledSparkApplication spec change detection", func() {
Context("when spec.schedule changes in ScheduleStateScheduled", func() {
const resourceName = "repro-schedule-change"
ctx := context.Background()
typeNamespacedName := types.NamespacedName{
Name: resourceName,
Namespace: "default",
}

BeforeEach(func() {
resource := &v1beta2.ScheduledSparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default",
},
Spec: v1beta2.ScheduledSparkApplicationSpec{
Schedule: "10 * * * *",
ConcurrencyPolicy: v1beta2.ConcurrencyAllow,
Template: v1beta2.SparkApplicationSpec{
Type: v1beta2.SparkApplicationTypeScala,
Mode: v1beta2.DeployModeCluster,
RestartPolicy: v1beta2.RestartPolicy{
Type: v1beta2.RestartPolicyNever,
},
MainApplicationFile: ptr.To("local:///dummy.jar"),
},
},
}
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
})

AfterEach(func() {
resource := &v1beta2.ScheduledSparkApplication{}
err := k8sClient.Get(ctx, typeNamespacedName, resource)
if err == nil {
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
}
})

It("should recalculate nextRun when spec.schedule changes", func() {
reconciler := NewReconciler(k8sClient.Scheme(), k8sClient, nil, clock.RealClock{}, Options{Namespaces: []string{"default"}, TimestampPrecision: "nanos"})

By("Reconciling to reach ScheduleStateScheduled")
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())

// Verify it's in Scheduled state
app := &v1beta2.ScheduledSparkApplication{}
Expect(k8sClient.Get(ctx, typeNamespacedName, app)).To(Succeed())
Expect(app.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateScheduled))
oldNextRun := app.Status.NextRun

By("Changing spec.schedule from '10 * * * *' to '50 * * * *'")
app.Spec.Schedule = "50 * * * *"
Expect(k8sClient.Update(ctx, app)).To(Succeed())

By("Reconciling again after schedule change")
_, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())

updatedApp := &v1beta2.ScheduledSparkApplication{}
Expect(k8sClient.Get(ctx, typeNamespacedName, updatedApp)).To(Succeed())
Expect(updatedApp.Status.NextRun.Time).NotTo(Equal(oldNextRun.Time),
"nextRun should be recalculated after spec.schedule change")
})
})

Context("when spec is fixed after FailedValidation", func() {
const resourceName = "repro-failed-validation"
ctx := context.Background()
typeNamespacedName := types.NamespacedName{
Name: resourceName,
Namespace: "default",
}

BeforeEach(func() {
resource := &v1beta2.ScheduledSparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: "default",
},
Spec: v1beta2.ScheduledSparkApplicationSpec{
Schedule: "invalid-cron",
ConcurrencyPolicy: v1beta2.ConcurrencyAllow,
Template: v1beta2.SparkApplicationSpec{
Type: v1beta2.SparkApplicationTypeScala,
Mode: v1beta2.DeployModeCluster,
RestartPolicy: v1beta2.RestartPolicy{
Type: v1beta2.RestartPolicyNever,
},
MainApplicationFile: ptr.To("local:///dummy.jar"),
},
},
}
Expect(k8sClient.Create(ctx, resource)).To(Succeed())
})

AfterEach(func() {
resource := &v1beta2.ScheduledSparkApplication{}
err := k8sClient.Get(ctx, typeNamespacedName, resource)
if err == nil {
Expect(k8sClient.Delete(ctx, resource)).To(Succeed())
}
})

It("should recover from FailedValidation when spec is fixed", func() {
reconciler := NewReconciler(k8sClient.Scheme(), k8sClient, nil, clock.RealClock{}, Options{Namespaces: []string{"default"}, TimestampPrecision: "nanos"})

By("Reconciling to reach ScheduleStateFailedValidation")
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())

app := &v1beta2.ScheduledSparkApplication{}
Expect(k8sClient.Get(ctx, typeNamespacedName, app)).To(Succeed())
Expect(app.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateFailedValidation))

By("Fixing spec.schedule to a valid cron expression")
app.Spec.Schedule = "10 * * * *"
Expect(k8sClient.Update(ctx, app)).To(Succeed())

By("Reconciling again after fixing the schedule")
_, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())

updatedApp := &v1beta2.ScheduledSparkApplication{}
Expect(k8sClient.Get(ctx, typeNamespacedName, updatedApp)).To(Succeed())
Expect(updatedApp.Status.ScheduleState).To(Equal(v1beta2.ScheduleStateScheduled),
"should recover from FailedValidation to Scheduled after spec is fixed")
Expect(updatedApp.Status.NextRun.IsZero()).To(BeFalse(),
"nextRun should be set after recovering from FailedValidation")
})
})
})

var _ = Describe("formatTimestamp", func() {
var testTime time.Time

Expand Down
Loading