Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions controllers/update_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (c updateStatus) reconcile(
}

updateFaultDomains(logger, processMap, &clusterStatus)
err = refreshProcessGroupStatus(ctx, r, cluster, &clusterStatus)
err = refreshProcessGroupStatus(ctx, r, logger, cluster, &clusterStatus)
if err != nil {
return &requeue{
curError: fmt.Errorf(
Expand Down Expand Up @@ -596,7 +596,7 @@ func validateProcessGroups(
(ok || processGroup.ProcessClass == fdbv1beta2.ProcessClassTest) {
logger.V(1).
Info("Process group is being removed without exclusion", "ProcessGroupID", processGroup.ProcessGroupID)
processGroup.ExclusionSkipped = ok
processGroup.ExclusionSkipped = true
processGroup.SetExclude()
}
}
Expand Down Expand Up @@ -1064,6 +1064,7 @@ func checkIfNodeHasTaintsAndUpdateConditions(
func refreshProcessGroupStatus(
ctx context.Context,
r *FoundationDBClusterReconciler,
logger logr.Logger,
cluster *fdbv1beta2.FoundationDBCluster,
status *fdbv1beta2.FoundationDBClusterStatus,
) error {
Expand All @@ -1074,7 +1075,7 @@ func refreshProcessGroupStatus(
}

// Track all created resources this will ensure that we catch all resources that are created by the operator
// even if the process group is currently missing for some reasons.
// even if the process group is currently missing for some reason.
pods, err := r.PodLifecycleManager.GetPods(
ctx,
r,
Expand All @@ -1090,6 +1091,11 @@ func refreshProcessGroupStatus(
continue
}

if !pod.DeletionTimestamp.IsZero() {
continue
}

logger.Info("found pod with missing process group", "processGroupID", processGroupID)
// Since we found a new process group we have to add it to our map.
knownProcessGroups[processGroupID] = fdbv1beta2.None{}
status.ProcessGroups = append(
Expand All @@ -1114,6 +1120,11 @@ func refreshProcessGroupStatus(
continue
}

if !pvc.DeletionTimestamp.IsZero() {
continue
}

logger.Info("found pvc with missing process group", "processGroupID", processGroupID)
// Since we found a new process group we have to add it to our map.
knownProcessGroups[processGroupID] = fdbv1beta2.None{}
status.ProcessGroups = append(
Expand All @@ -1139,11 +1150,14 @@ func refreshProcessGroupStatus(
if processGroupID == "" {
continue
}

if !service.DeletionTimestamp.IsZero() {
continue
}
if _, ok := knownProcessGroups[processGroupID]; ok {
continue
}

logger.Info("found service with missing process group", "processGroupID", processGroupID)
// Since we found a new process group we have to add it to our map.
knownProcessGroups[processGroupID] = fdbv1beta2.None{}
status.ProcessGroups = append(
Expand Down
265 changes: 265 additions & 0 deletions controllers/update_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package controllers

import (
"context"
"fmt"
"math"
"time"

Expand Down Expand Up @@ -1702,4 +1703,268 @@ var _ = Describe("update_status", func() {
})
})
})

When("refreshing process group status", func() {
var cluster *fdbv1beta2.FoundationDBCluster
var clusterStatus fdbv1beta2.FoundationDBClusterStatus

BeforeEach(func() {
cluster = internal.CreateDefaultCluster()
Expect(setupClusterForTest(cluster)).NotTo(HaveOccurred())
clusterStatus = *cluster.Status.DeepCopy()
})

It("should not add any process groups when all are already tracked", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount))
})

When("a pod with an unknown process group ID exists", func() {
var unknownID fdbv1beta2.ProcessGroupID

BeforeEach(func() {
unknownID = "storage-99"
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), pod)).To(Succeed())
})

It("should add the process group from the pod", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount + 1))
ids := make([]fdbv1beta2.ProcessGroupID, 0, len(clusterStatus.ProcessGroups))
for _, pg := range clusterStatus.ProcessGroups {
ids = append(ids, pg.ProcessGroupID)
}
Expect(ids).To(ContainElement(unknownID))
})
})

When("a pod with an unknown process group ID is being deleted", func() {
BeforeEach(func() {
unknownID := fdbv1beta2.ProcessGroupID("storage-99")
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), pod)).To(Succeed())
Expect(k8sClient.MockStuckTermination(pod, true)).To(Succeed())
Expect(k8sClient.Delete(context.TODO(), pod)).To(Succeed())
})

It("should not add the process group", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount))
})
})

When("a PVC with an unknown process group ID exists", func() {
var unknownID fdbv1beta2.ProcessGroupID

BeforeEach(func() {
unknownID = "storage-99"
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), pvc)).To(Succeed())
})

It("should add the process group from the PVC", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount + 1))
ids := make([]fdbv1beta2.ProcessGroupID, 0, len(clusterStatus.ProcessGroups))
for _, pg := range clusterStatus.ProcessGroups {
ids = append(ids, pg.ProcessGroupID)
}
Expect(ids).To(ContainElement(unknownID))
})
})

When("a PVC with an unknown process group ID is being deleted", func() {
BeforeEach(func() {
unknownID := fdbv1beta2.ProcessGroupID("storage-99")
pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), pvc)).To(Succeed())
Expect(k8sClient.MockStuckTermination(pvc, true)).To(Succeed())
Expect(k8sClient.Delete(context.TODO(), pvc)).To(Succeed())
})

It("should not add the process group", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount))
})
})

When("a service with an unknown process group ID exists", func() {
var unknownID fdbv1beta2.ProcessGroupID

BeforeEach(func() {
unknownID = "storage-99"
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), svc)).To(Succeed())
})

It("should add the process group from the service", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount + 1))
ids := make([]fdbv1beta2.ProcessGroupID, 0, len(clusterStatus.ProcessGroups))
for _, pg := range clusterStatus.ProcessGroups {
ids = append(ids, pg.ProcessGroupID)
}
Expect(ids).To(ContainElement(unknownID))
})
})

When("a service without a process group ID label exists", func() {
BeforeEach(func() {
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name + "-headless",
Namespace: cluster.Namespace,
Labels: cluster.GetMatchLabels(),
},
}
Expect(k8sClient.Create(context.TODO(), svc)).To(Succeed())
})

It("should not add any process group", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount))
})
})

When("a service with an unknown process group ID is being deleted", func() {
BeforeEach(func() {
unknownID := fdbv1beta2.ProcessGroupID("storage-99")
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), svc)).To(Succeed())
Expect(k8sClient.MockStuckTermination(svc, true)).To(Succeed())
Expect(k8sClient.Delete(context.TODO(), svc)).To(Succeed())
})

It("should not add the process group", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount))
})
})

When("the same unknown process group ID appears in both a pod and a PVC", func() {
var unknownID fdbv1beta2.ProcessGroupID

BeforeEach(func() {
unknownID = "storage-99"
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), pod)).To(Succeed())

pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("data-%s-%s", cluster.Name, unknownID),
Namespace: cluster.Namespace,
Labels: internal.GetPodMatchLabels(
cluster,
fdbv1beta2.ProcessClassStorage,
string(unknownID),
),
},
}
Expect(k8sClient.Create(context.TODO(), pvc)).To(Succeed())
})

It("should add the process group exactly once", func() {
initialCount := len(clusterStatus.ProcessGroups)
Expect(refreshProcessGroupStatus(
context.TODO(), clusterReconciler, logger, cluster, &clusterStatus,
)).To(Succeed())
Expect(clusterStatus.ProcessGroups).To(HaveLen(initialCount + 1))
})
})
})
})
2 changes: 1 addition & 1 deletion internal/locality/locality.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func CheckCoordinatorValidity(
}

if process.ProcessClass == fdbv1beta2.ProcessClassTest {
pLogger.V(1).Info("Ignoring tester process")
pLogger.V(1).Info("Ignoring tester process", "processGroupID", processGroupID)
continue
}

Expand Down
2 changes: 0 additions & 2 deletions mock-kubernetes-client/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ func (client *MockClient) List(
list ctrlClient.ObjectList,
options ...ctrlClient.ListOption,
) error {
// TODO (johscheuer): Once https://github.com/kubernetes-sigs/controller-runtime/pull/2025 is merged and we update the
// controller-runtime we will support field selectors.
return client.fakeClient.List(ctx, list, options...)
}

Expand Down
Loading