Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ce0d4af

Browse files
committedMay 7, 2018
Initial implementation for the statefulset annotations indicating rolling updates.
1 parent 43a1db2 commit ce0d4af

File tree

5 files changed

+95
-94
lines changed

5 files changed

+95
-94
lines changed
 

‎.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# Folders
77
_obj
88
_test
9+
_manifests
910

1011
# Architecture specific extensions/prefixes
1112
*.[568vq]

‎pkg/cluster/cluster.go

+18-16
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,12 @@ type Cluster struct {
7272
deleteOptions *metav1.DeleteOptions
7373
podEventsQueue *cache.FIFO
7474

75-
teamsAPIClient teams.Interface
76-
oauthTokenGetter OAuthTokenGetter
77-
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
78-
currentProcess spec.Process
79-
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
80-
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
81-
pendingRollingUpdate *bool // indicates the cluster needs a rolling update
75+
teamsAPIClient teams.Interface
76+
oauthTokenGetter OAuthTokenGetter
77+
KubeClient k8sutil.KubernetesClient //TODO: move clients to the better place?
78+
currentProcess spec.Process
79+
processMu sync.RWMutex // protects the current operation for reporting, no need to hold the master mutex
80+
specMu sync.RWMutex // protects the spec for reporting, no need to hold the master mutex
8281
}
8382

8483
type compareStatefulsetResult struct {
@@ -111,11 +110,10 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec spec.Postgresql
111110
Secrets: make(map[types.UID]*v1.Secret),
112111
Services: make(map[PostgresRole]*v1.Service),
113112
Endpoints: make(map[PostgresRole]*v1.Endpoints)},
114-
userSyncStrategy: users.DefaultUserSyncStrategy{},
115-
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
116-
podEventsQueue: podEventsQueue,
117-
KubeClient: kubeClient,
118-
pendingRollingUpdate: nil,
113+
userSyncStrategy: users.DefaultUserSyncStrategy{},
114+
deleteOptions: &metav1.DeleteOptions{OrphanDependents: &orphanDependents},
115+
podEventsQueue: podEventsQueue,
116+
KubeClient: kubeClient,
119117
}
120118
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
121119
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
@@ -251,7 +249,6 @@ func (c *Cluster) Create() error {
251249
}()
252250

253251
c.setStatus(spec.ClusterStatusCreating)
254-
c.setPendingRollingUpgrade(false)
255252

256253
for _, role := range []PostgresRole{Master, Replica} {
257254

@@ -301,7 +298,7 @@ func (c *Cluster) Create() error {
301298
if c.Statefulset != nil {
302299
return fmt.Errorf("statefulset already exists in the cluster")
303300
}
304-
ss, err = c.createStatefulSet()
301+
ss, err = c.createStatefulSet(false)
305302
if err != nil {
306303
return fmt.Errorf("could not create statefulset: %v", err)
307304
}
@@ -345,6 +342,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
345342
match = false
346343
reasons = append(reasons, "new statefulset's number of replicas doesn't match the current one")
347344
}
345+
if !reflect.DeepEqual(c.Statefulset.Annotations, statefulSet.Annotations) {
346+
match = false
347+
reasons = append(reasons, "new statefulset's annotations doesn't match the current one")
348+
}
348349
if len(c.Statefulset.Spec.Template.Spec.Containers) != len(statefulSet.Spec.Template.Spec.Containers) {
349350
needsRollUpdate = true
350351
reasons = append(reasons, "new statefulset's container specification doesn't match the current one")
@@ -396,9 +397,10 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *v1beta1.StatefulSet) *comp
396397
}
397398

398399
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations) {
399-
needsRollUpdate = true
400+
match = false
400401
needsReplace = true
401-
reasons = append(reasons, "new statefulset's metadata annotations doesn't match the current one")
402+
needsRollUpdate = true
403+
reasons = append(reasons, "new statefulset's pod template metadata annotations doesn't match the current one")
402404
}
403405
if len(c.Statefulset.Spec.VolumeClaimTemplates) != len(statefulSet.Spec.VolumeClaimTemplates) {
404406
needsReplace = true

‎pkg/cluster/resources.go

+47-3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ import (
1717
"github.com/zalando-incubator/postgres-operator/pkg/util/retryutil"
1818
)
1919

20+
const (
21+
RollingUpdateStatefulsetAnnotationKey = "zalando-postgres-operator-rolling-update"
22+
)
23+
2024
func (c *Cluster) listResources() error {
2125
if c.PodDisruptionBudget != nil {
2226
c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID)
@@ -59,9 +63,38 @@ func (c *Cluster) listResources() error {
5963
return nil
6064
}
6165

62-
func (c *Cluster) createStatefulSet() (*v1beta1.StatefulSet, error) {
66+
func setRollingUpdateFlag(sset *v1beta1.StatefulSet, val bool) {
67+
anno := sset.GetAnnotations()
68+
fmt.Printf("rolling upgrade flag has been set to %t", val)
69+
if anno == nil {
70+
anno = make(map[string]string)
71+
}
72+
anno[RollingUpdateStatefulsetAnnotationKey] = strconv.FormatBool(val)
73+
sset.SetAnnotations(anno)
74+
}
75+
76+
func getRollingUpdateFlag(sset *v1beta1.StatefulSet, defaultValue bool) (flag bool) {
77+
anno := sset.GetAnnotations()
78+
flag = defaultValue
79+
80+
stringFlag, exists := anno[RollingUpdateStatefulsetAnnotationKey]
81+
if exists {
82+
var err error
83+
if flag, err = strconv.ParseBool(stringFlag); err != nil {
84+
fmt.Printf("error when parsing %s annotation for the statefulset %s: expected boolean value, got %s\n",
85+
RollingUpdateStatefulsetAnnotationKey,
86+
types.NamespacedName{sset.Namespace, sset.Name},
87+
stringFlag)
88+
flag = defaultValue
89+
}
90+
}
91+
return flag
92+
}
93+
94+
func (c *Cluster) createStatefulSet(pendingRollingUpgrade bool) (*v1beta1.StatefulSet, error) {
6395
c.setProcessName("creating statefulset")
6496
statefulSetSpec, err := c.generateStatefulSet(&c.Spec)
97+
setRollingUpdateFlag(statefulSetSpec, pendingRollingUpgrade)
6598
if err != nil {
6699
return nil, fmt.Errorf("could not generate statefulset: %v", err)
67100
}
@@ -128,7 +161,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error {
128161
return nil
129162
}
130163

131-
func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
164+
func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet, includeAnnotations bool) error {
132165
c.setProcessName("updating statefulset")
133166
if c.Statefulset == nil {
134167
return fmt.Errorf("there is no statefulset in the cluster")
@@ -153,8 +186,19 @@ func (c *Cluster) updateStatefulSet(newStatefulSet *v1beta1.StatefulSet) error {
153186
types.MergePatchType,
154187
patchData, "")
155188
if err != nil {
156-
return fmt.Errorf("could not patch statefulset %q: %v", statefulSetName, err)
189+
return fmt.Errorf("could not patch statefulset spec %q: %v", statefulSetName, err)
157190
}
191+
if includeAnnotations && newStatefulSet.Annotations != nil {
192+
patchData := metadataAnnotationsPatch(newStatefulSet.Annotations)
193+
statefulSet, err = c.KubeClient.StatefulSets(c.Statefulset.Namespace).Patch(
194+
c.Statefulset.Name,
195+
types.StrategicMergePatchType,
196+
[]byte(patchData), "")
197+
if err != nil {
198+
return fmt.Errorf("could not patch statefulset annotations %q: %v", patchData, err)
199+
}
200+
}
201+
158202
c.Statefulset = statefulSet
159203

160204
return nil

‎pkg/cluster/sync.go

+29-18
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,9 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error {
220220
}
221221

222222
func (c *Cluster) syncStatefulSet() error {
223-
223+
var (
224+
cachedRollingUpdateFlag, podsRollingUpdateRequired bool
225+
)
224226
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
225227
if err != nil {
226228
if !k8sutil.ResourceNotFound(err) {
@@ -234,7 +236,8 @@ func (c *Cluster) syncStatefulSet() error {
234236
return fmt.Errorf("could not list pods of the statefulset: %v", err)
235237
}
236238

237-
sset, err = c.createStatefulSet()
239+
podsRollingUpdateRequired := (len(pods) > 0)
240+
sset, err = c.createStatefulSet(podsRollingUpdateRequired)
238241
if err != nil {
239242
return fmt.Errorf("could not create missing statefulset: %v", err)
240243
}
@@ -244,36 +247,42 @@ func (c *Cluster) syncStatefulSet() error {
244247
}
245248

246249
c.logger.Infof("created missing statefulset %q", util.NameFromMeta(sset.ObjectMeta))
247-
if len(pods) <= 0 {
248-
return nil
249-
}
250-
c.logger.Infof("found pods without the statefulset: trigger rolling update")
251-
c.setPendingRollingUpgrade(true)
252250

253251
} else {
252+
if c.Statefulset != nil {
253+
// if we reset the rolling update flag in the statefulset structure in memory but didn't manage to update
254+
// the actual object in Kubernetes for some reason we want to avoid doing an unnecessary update by relying
255+
// on the 'cached' in-memory flag.
256+
cachedRollingUpdateFlag = getRollingUpdateFlag(c.Statefulset, true)
257+
c.logger.Debugf("cached statefulset value exists, rollingUpdate flag is %t", cachedRollingUpdateFlag)
258+
}
254259
// statefulset is already there, make sure we use its definition in order to compare with the spec.
255260
c.Statefulset = sset
256-
// resolve the pending rolling upgrade flags as soon as we read an actual statefulset from kubernetes.
257-
// we must do it before updating statefulsets; after an update, the statfulset will receive a new
258-
// updateRevision, different from the one the pods run with.
259-
if err := c.resolvePendingRollingUpdate(sset); err != nil {
260-
return fmt.Errorf("could not resolve the rolling upgrade status: %v", err)
261+
if podsRollingUpdateRequired = getRollingUpdateFlag(c.Statefulset, false); podsRollingUpdateRequired {
262+
if cachedRollingUpdateFlag {
263+
c.logger.Infof("found a statefulset with an unfinished pods rolling update")
264+
} else {
265+
c.logger.Infof("clearing the rolling update flag based on the cached information")
266+
podsRollingUpdateRequired = false
267+
}
261268
}
262269

263270
desiredSS, err := c.generateStatefulSet(&c.Spec)
264271
if err != nil {
265272
return fmt.Errorf("could not generate statefulset: %v", err)
266273
}
274+
setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired)
267275

268276
cmp := c.compareStatefulSetWith(desiredSS)
269277
if !cmp.match {
270-
if cmp.rollingUpdate {
271-
c.setPendingRollingUpgrade(true)
278+
if cmp.rollingUpdate && !podsRollingUpdateRequired {
279+
podsRollingUpdateRequired = true
280+
setRollingUpdateFlag(desiredSS, podsRollingUpdateRequired)
272281
}
273282
c.logStatefulSetChanges(c.Statefulset, desiredSS, false, cmp.reasons)
274283

275284
if !cmp.replace {
276-
if err := c.updateStatefulSet(desiredSS); err != nil {
285+
if err := c.updateStatefulSet(desiredSS, true); err != nil {
277286
return fmt.Errorf("could not update statefulset: %v", err)
278287
}
279288
} else {
@@ -285,15 +294,17 @@ func (c *Cluster) syncStatefulSet() error {
285294
}
286295
// if we get here we also need to re-create the pods (either leftovers from the old
287296
// statefulset or those that got their configuration from the outdated statefulset)
288-
if *c.pendingRollingUpdate {
297+
if podsRollingUpdateRequired {
289298
c.logger.Debugln("performing rolling update")
290299
if err := c.recreatePods(); err != nil {
291300
return fmt.Errorf("could not recreate pods: %v", err)
292301
}
293-
c.setPendingRollingUpgrade(false)
294302
c.logger.Infof("pods have been recreated")
303+
setRollingUpdateFlag(c.Statefulset, false)
304+
if err := c.updateStatefulSet(c.Statefulset, true); err != nil {
305+
c.logger.Warningf("could not clear rolling update for the statefulset")
306+
}
295307
}
296-
297308
return nil
298309
}
299310

‎pkg/cluster/util.go

-57
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@ func NewSecretOauthTokenGetter(kubeClient *k8sutil.KubernetesClient,
3939
return &SecretOauthTokenGetter{kubeClient, OAuthTokenSecretName}
4040
}
4141

42-
const (
43-
podControllerRevisionHashLabel = "controller-revision-hash"
44-
)
45-
4642
func (g *SecretOauthTokenGetter) getOAuthToken() (string, error) {
4743
//TODO: we can move this function to the Controller in case it will be needed there. As for now we use it only in the Cluster
4844
// Temporary getting postgresql-operator secret from the NamespaceDefault
@@ -462,56 +458,3 @@ func (c *Cluster) GetSpec() (*spec.Postgresql, error) {
462458
func (c *Cluster) patroniUsesKubernetes() bool {
463459
return c.OpConfig.EtcdHost == ""
464460
}
465-
466-
func (c *Cluster) setPendingRollingUpgrade(val bool) {
467-
if c.pendingRollingUpdate == nil {
468-
c.pendingRollingUpdate = new(bool)
469-
}
470-
*c.pendingRollingUpdate = val
471-
c.logger.Debugf("pending rolling upgrade was set to %b", val)
472-
}
473-
474-
// resolvePendingRollingUpdate figures out if rolling upgrade is necessary
475-
// based on the states of the cluster statefulset and pods
476-
func (c *Cluster) resolvePendingRollingUpdate(sset *v1beta1.StatefulSet) error {
477-
// XXX: it looks like we will always trigger a rolling update if the
478-
// pods are on a different revision from a statefulset, even if the
479-
// statefulset change that caused it didn't require a rolling update
480-
// originally.
481-
if c.pendingRollingUpdate != nil {
482-
return nil
483-
}
484-
c.logger.Debugf("evaluating rolling upgrade requirement")
485-
effectiveRevision := sset.Status.UpdateRevision
486-
if effectiveRevision == "" {
487-
if sset.Status.CurrentRevision == "" {
488-
c.logger.Debugf("statefulset doesn't have a current revision, no rolling upgrade")
489-
// the statefulset does not have a currentRevision, it must be new; hence, no rollingUpdate
490-
c.setPendingRollingUpgrade(false)
491-
return nil
492-
}
493-
effectiveRevision = sset.Status.CurrentRevision
494-
}
495-
496-
// fetch all pods related to this cluster
497-
pods, err := c.listPods()
498-
if err != nil {
499-
return err
500-
}
501-
// check their revisions
502-
for _, pod := range pods {
503-
podRevision, present := pod.Labels[podControllerRevisionHashLabel]
504-
// empty or missing revision indicates a new pod - doesn't need a rolling upgrade
505-
if !present || podRevision == "" {
506-
continue
507-
}
508-
c.logger.Debugf("observing pod revision %q vs statefulset revision %q", podRevision, effectiveRevision)
509-
if podRevision != effectiveRevision {
510-
// pod is on a different revision - trigger the rolling upgrade
511-
c.setPendingRollingUpgrade(true)
512-
return nil
513-
}
514-
}
515-
c.setPendingRollingUpgrade(false)
516-
return nil
517-
}

0 commit comments

Comments
 (0)
Please sign in to comment.