in pkg/controller/logstash/reconcile.go [37:115]
func reconcileStatefulSet(params Params, podTemplate corev1.PodTemplateSpec) (*reconciler.Results, logstashv1alpha1.LogstashStatus) {
defer tracing.Span(¶ms.Context)()
results := reconciler.NewResult(params.Context)
ok, _, err := params.expectationsSatisfied(params.Context)
if err != nil {
return results.WithError(err), params.Status
}
if !ok {
return results.WithResult(reconcile.Result{Requeue: true}), params.Status
}
expected := sset.New(sset.Params{
Name: logstashv1alpha1.Name(params.Logstash.Name),
Namespace: params.Logstash.Namespace,
ServiceName: logstashv1alpha1.APIServiceName(params.Logstash.Name),
Selector: params.Logstash.GetIdentityLabels(),
Labels: params.Logstash.GetIdentityLabels(),
PodTemplateSpec: podTemplate,
Replicas: params.Logstash.Spec.Count,
RevisionHistoryLimit: params.Logstash.Spec.RevisionHistoryLimit,
UpdateStrategy: params.Logstash.Spec.UpdateStrategy,
VolumeClaimTemplates: params.Logstash.Spec.VolumeClaimTemplates,
})
recreations, err := volume.RecreateStatefulSets(params.Context, params.Client, params.Logstash)
if err != nil {
if apierrors.IsConflict(err) {
ulog.FromContext(params.Context).V(1).Info("Conflict while recreating stateful set, requeueing", "message", err)
return results.WithResult(reconcile.Result{Requeue: true}), params.Status
}
return results.WithError(fmt.Errorf("StatefulSet recreation: %w", err)), params.Status
}
if recreations > 0 {
// Statefulset is in the process of being recreated to handle PVC expansion:
// it is safer to requeue until the re-creation is done.
// Otherwise, some operation could be performed with wrong assumptions:
// the sset doesn't exist (was just deleted), but the Pods do actually exist.
ulog.FromContext(params.Context).V(1).Info("StatefulSets recreation in progress, re-queueing after 30 seconds.", "namespace", params.Logstash.Namespace, "ls_name", params.Logstash.Name,
"status", params.Status)
return results.WithResult(reconcile.Result{RequeueAfter: 30 * time.Second}), params.Status
}
actualStatefulSet, err := retrieveActualStatefulSet(params.Client, params.Logstash)
notFound := apierrors.IsNotFound(err)
if err != nil && !notFound {
return results.WithError(err), params.Status
}
if !notFound {
recreateSset, err := volume.HandleVolumeExpansion(params.Context, params.Client, params.Logstash, expected, actualStatefulSet, true)
if err != nil {
return results.WithError(err), params.Status
}
if recreateSset {
return results.WithResult(reconcile.Result{Requeue: true}), params.Status
}
}
if err := controllerutil.SetControllerReference(¶ms.Logstash, &expected, scheme.Scheme); err != nil {
return results.WithError(err), params.Status
}
reconciled, err := sset.Reconcile(params.Context, params.Client, expected, params.Logstash, params.Expectations)
if err != nil {
return results.WithError(err), params.Status
}
var status logstashv1alpha1.LogstashStatus
if status, err = calculateStatus(¶ms, reconciled); err != nil {
results.WithError(errors.Wrap(err, "while calculating status"))
}
return results, status
}