Skip to content

Commit dc2728a

Browse files
update
1 parent b4d59e2 commit dc2728a

File tree

5 files changed

+184
-33
lines changed

5 files changed

+184
-33
lines changed

controllers/common.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ const (
4646
CleanUpFinalizerName = "cleanup.subscription.finalizer"
4747
)
4848

49+
func shouldPauseNonGenerationRollout(object metav1.Object, isNewGeneration bool) bool {
50+
return spec.IsPauseRollout(object) && !isNewGeneration && object.GetDeletionTimestamp().IsZero()
51+
}
52+
53+
func shouldApplyPausedResourceAction(action v1alpha1.ReconcileAction) bool {
54+
return action == v1alpha1.Create || action == v1alpha1.Delete
55+
}
56+
4957
func deleteHPAV2Beta2(ctx context.Context, r client.Client, name types.NamespacedName) error {
5058
hpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
5159
err := r.Get(ctx, name, hpa)

controllers/common_pause_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package controllers
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/streamnative/function-mesh/api/compute/v1alpha1"
8+
"github.com/streamnative/function-mesh/utils"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
)
11+
12+
func TestShouldPauseNonGenerationRollout(t *testing.T) {
13+
oldPauseRollout := utils.PauseRollout
14+
t.Cleanup(func() {
15+
utils.PauseRollout = oldPauseRollout
16+
})
17+
18+
utils.PauseRollout = true
19+
object := &metav1.ObjectMeta{}
20+
if !shouldPauseNonGenerationRollout(object, false) {
21+
t.Fatal("expected pause rollout for non-generation reconcile")
22+
}
23+
24+
if shouldPauseNonGenerationRollout(object, true) {
25+
t.Fatal("did not expect pause rollout for new generation")
26+
}
27+
28+
now := metav1.NewTime(time.Now())
29+
object.DeletionTimestamp = &now
30+
if shouldPauseNonGenerationRollout(object, false) {
31+
t.Fatal("did not expect pause rollout while object is deleting")
32+
}
33+
}
34+
35+
func TestShouldApplyPausedResourceAction(t *testing.T) {
36+
if !shouldApplyPausedResourceAction(v1alpha1.Create) {
37+
t.Fatal("expected create action to be allowed while paused")
38+
}
39+
if !shouldApplyPausedResourceAction(v1alpha1.Delete) {
40+
t.Fatal("expected delete action to be allowed while paused")
41+
}
42+
43+
blockedActions := []v1alpha1.ReconcileAction{
44+
v1alpha1.Update,
45+
v1alpha1.Wait,
46+
v1alpha1.NoAction,
47+
}
48+
for _, action := range blockedActions {
49+
if shouldApplyPausedResourceAction(action) {
50+
t.Fatalf("did not expect %s action to be allowed while paused", action)
51+
}
52+
}
53+
}

controllers/function_controller.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,22 +100,13 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
100100
}
101101

102102
isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function)
103+
pauseRollout := shouldPauseNonGenerationRollout(function, isNewGeneration)
104+
function.Status.PendingChange = ""
103105

104106
err = r.ObserveFunctionStatefulSet(ctx, function)
105107
if err != nil {
106108
return reconcile.Result{}, err
107109
}
108-
// skip reconcile if pauseRollout is set to true and the generation is not increased
109-
if spec.IsPauseRollout(function) && !isNewGeneration {
110-
err = r.Status().Update(ctx, function)
111-
if err != nil {
112-
r.Log.Error(err, "failed to update function status after observing statefulset")
113-
return ctrl.Result{}, err
114-
}
115-
return ctrl.Result{}, nil
116-
} else {
117-
function.Status.PendingChange = ""
118-
}
119110

120111
err = r.ObserveFunctionService(ctx, function)
121112
if err != nil {
@@ -144,6 +135,45 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
144135
return ctrl.Result{}, err
145136
}
146137

138+
if pauseRollout {
139+
if shouldApplyPausedResourceAction(function.Status.Conditions[v1alpha1.StatefulSet].Action) {
140+
err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration)
141+
if err != nil {
142+
return reconcile.Result{}, err
143+
}
144+
}
145+
if shouldApplyPausedResourceAction(function.Status.Conditions[v1alpha1.Service].Action) {
146+
err = r.ApplyFunctionService(ctx, function, isNewGeneration)
147+
if err != nil {
148+
return reconcile.Result{}, err
149+
}
150+
}
151+
if condition, ok := function.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
152+
if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 {
153+
err = r.ApplyFunctionHPAV2Beta2(ctx, function, isNewGeneration)
154+
if err != nil {
155+
return reconcile.Result{}, err
156+
}
157+
} else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 {
158+
err = r.ApplyFunctionHPA(ctx, function, isNewGeneration)
159+
if err != nil {
160+
return reconcile.Result{}, err
161+
}
162+
}
163+
}
164+
if condition, ok := function.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
165+
err = r.ApplyFunctionVPA(ctx, function)
166+
if err != nil {
167+
return reconcile.Result{}, err
168+
}
169+
}
170+
err = r.ApplyFunctionCleanUpJob(ctx, function)
171+
if err != nil {
172+
return reconcile.Result{}, err
173+
}
174+
return ctrl.Result{}, nil
175+
}
176+
147177
err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration)
148178
if err != nil {
149179
return reconcile.Result{}, err

controllers/sink_controller.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,13 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
9999
}
100100

101101
isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink)
102+
pauseRollout := shouldPauseNonGenerationRollout(sink, isNewGeneration)
103+
sink.Status.PendingChange = ""
102104

103105
err = r.ObserveSinkStatefulSet(ctx, sink)
104106
if err != nil {
105107
return reconcile.Result{}, err
106108
}
107-
// skip reconcile if pauseRollout is set to true and the generation is not increased
108-
if spec.IsPauseRollout(sink) && !isNewGeneration {
109-
err = r.Status().Update(ctx, sink)
110-
if err != nil {
111-
r.Log.Error(err, "failed to update sink status after observing statefulset")
112-
return ctrl.Result{}, err
113-
}
114-
return ctrl.Result{}, nil
115-
} else {
116-
sink.Status.PendingChange = ""
117-
}
118109

119110
err = r.ObserveSinkService(ctx, sink)
120111
if err != nil {
@@ -143,6 +134,45 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
143134
return ctrl.Result{}, err
144135
}
145136

137+
if pauseRollout {
138+
if shouldApplyPausedResourceAction(sink.Status.Conditions[v1alpha1.StatefulSet].Action) {
139+
err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration)
140+
if err != nil {
141+
return reconcile.Result{}, err
142+
}
143+
}
144+
if shouldApplyPausedResourceAction(sink.Status.Conditions[v1alpha1.Service].Action) {
145+
err = r.ApplySinkService(ctx, sink, isNewGeneration)
146+
if err != nil {
147+
return reconcile.Result{}, err
148+
}
149+
}
150+
if condition, ok := sink.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
151+
if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 {
152+
err = r.ApplySinkHPAV2Beta2(ctx, sink, isNewGeneration)
153+
if err != nil {
154+
return reconcile.Result{}, err
155+
}
156+
} else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 {
157+
err = r.ApplySinkHPA(ctx, sink, isNewGeneration)
158+
if err != nil {
159+
return reconcile.Result{}, err
160+
}
161+
}
162+
}
163+
if condition, ok := sink.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
164+
err = r.ApplySinkVPA(ctx, sink)
165+
if err != nil {
166+
return reconcile.Result{}, err
167+
}
168+
}
169+
err = r.ApplySinkCleanUpJob(ctx, sink)
170+
if err != nil {
171+
return reconcile.Result{}, err
172+
}
173+
return ctrl.Result{}, nil
174+
}
175+
146176
err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration)
147177
if err != nil {
148178
return reconcile.Result{}, err

controllers/source_controller.go

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,22 +99,13 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
9999
}
100100

101101
isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source)
102+
pauseRollout := shouldPauseNonGenerationRollout(source, isNewGeneration)
103+
source.Status.PendingChange = ""
102104

103105
err = r.ObserveSourceStatefulSet(ctx, source)
104106
if err != nil {
105107
return reconcile.Result{}, err
106108
}
107-
// skip reconcile if pauseRollout is set to true and the generation is not increased
108-
if spec.IsPauseRollout(source) && !isNewGeneration {
109-
err = r.Status().Update(ctx, source)
110-
if err != nil {
111-
r.Log.Error(err, "failed to update source status after observing statefulset")
112-
return ctrl.Result{}, err
113-
}
114-
return ctrl.Result{}, nil
115-
} else {
116-
source.Status.PendingChange = ""
117-
}
118109

119110
err = r.ObserveSourceService(ctx, source)
120111
if err != nil {
@@ -143,6 +134,45 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
143134
return ctrl.Result{}, err
144135
}
145136

137+
if pauseRollout {
138+
if shouldApplyPausedResourceAction(source.Status.Conditions[v1alpha1.StatefulSet].Action) {
139+
err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration)
140+
if err != nil {
141+
return reconcile.Result{}, err
142+
}
143+
}
144+
if shouldApplyPausedResourceAction(source.Status.Conditions[v1alpha1.Service].Action) {
145+
err = r.ApplySourceService(ctx, source, isNewGeneration)
146+
if err != nil {
147+
return reconcile.Result{}, err
148+
}
149+
}
150+
if condition, ok := source.Status.Conditions[v1alpha1.HPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
151+
if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2Beta2 {
152+
err = r.ApplySourceHPAV2Beta2(ctx, source, isNewGeneration)
153+
if err != nil {
154+
return reconcile.Result{}, err
155+
}
156+
} else if r.GroupVersionFlags != nil && r.GroupVersionFlags.APIAutoscalingGroupVersion == utils.GroupVersionV2 {
157+
err = r.ApplySourceHPA(ctx, source, isNewGeneration)
158+
if err != nil {
159+
return reconcile.Result{}, err
160+
}
161+
}
162+
}
163+
if condition, ok := source.Status.Conditions[v1alpha1.VPA]; ok && shouldApplyPausedResourceAction(condition.Action) {
164+
err = r.ApplySourceVPA(ctx, source)
165+
if err != nil {
166+
return reconcile.Result{}, err
167+
}
168+
}
169+
err = r.ApplySourceCleanUpJob(ctx, source)
170+
if err != nil {
171+
return reconcile.Result{}, err
172+
}
173+
return ctrl.Result{}, nil
174+
}
175+
146176
err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration)
147177
if err != nil {
148178
return reconcile.Result{}, err

0 commit comments

Comments
 (0)