func reconcileStatefulSet()

in pkg/controller/logstash/reconcile.go [37:115]


func reconcileStatefulSet(params Params, podTemplate corev1.PodTemplateSpec) (*reconciler.Results, logstashv1alpha1.LogstashStatus) {
	defer tracing.Span(&params.Context)()
	results := reconciler.NewResult(params.Context)

	ok, _, err := params.expectationsSatisfied(params.Context)
	if err != nil {
		return results.WithError(err), params.Status
	}

	if !ok {
		return results.WithResult(reconcile.Result{Requeue: true}), params.Status
	}

	expected := sset.New(sset.Params{
		Name:                 logstashv1alpha1.Name(params.Logstash.Name),
		Namespace:            params.Logstash.Namespace,
		ServiceName:          logstashv1alpha1.APIServiceName(params.Logstash.Name),
		Selector:             params.Logstash.GetIdentityLabels(),
		Labels:               params.Logstash.GetIdentityLabels(),
		PodTemplateSpec:      podTemplate,
		Replicas:             params.Logstash.Spec.Count,
		RevisionHistoryLimit: params.Logstash.Spec.RevisionHistoryLimit,
		UpdateStrategy:       params.Logstash.Spec.UpdateStrategy,
		VolumeClaimTemplates: params.Logstash.Spec.VolumeClaimTemplates,
	})

	recreations, err := volume.RecreateStatefulSets(params.Context, params.Client, params.Logstash)

	if err != nil {
		if apierrors.IsConflict(err) {
			ulog.FromContext(params.Context).V(1).Info("Conflict while recreating stateful set, requeueing", "message", err)
			return results.WithResult(reconcile.Result{Requeue: true}), params.Status
		}
		return results.WithError(fmt.Errorf("StatefulSet recreation: %w", err)), params.Status
	}

	if recreations > 0 {
		// Statefulset is in the process of being recreated to handle PVC expansion:
		// it is safer to requeue until the re-creation is done.
		// Otherwise, some operation could be performed with wrong assumptions:
		// the sset doesn't exist (was just deleted), but the Pods do actually exist.
		ulog.FromContext(params.Context).V(1).Info("StatefulSets recreation in progress, re-queueing after 30 seconds.", "namespace", params.Logstash.Namespace, "ls_name", params.Logstash.Name,
			"status", params.Status)
		return results.WithResult(reconcile.Result{RequeueAfter: 30 * time.Second}), params.Status
	}

	actualStatefulSet, err := retrieveActualStatefulSet(params.Client, params.Logstash)

	notFound := apierrors.IsNotFound(err)
	if err != nil && !notFound {
		return results.WithError(err), params.Status
	}

	if !notFound {
		recreateSset, err := volume.HandleVolumeExpansion(params.Context, params.Client, params.Logstash, expected, actualStatefulSet, true)
		if err != nil {
			return results.WithError(err), params.Status
		}
		if recreateSset {
			return results.WithResult(reconcile.Result{Requeue: true}), params.Status
		}
	}

	if err := controllerutil.SetControllerReference(&params.Logstash, &expected, scheme.Scheme); err != nil {
		return results.WithError(err), params.Status
	}
	reconciled, err := sset.Reconcile(params.Context, params.Client, expected, params.Logstash, params.Expectations)

	if err != nil {
		return results.WithError(err), params.Status
	}

	var status logstashv1alpha1.LogstashStatus

	if status, err = calculateStatus(&params, reconciled); err != nil {
		results.WithError(errors.Wrap(err, "while calculating status"))
	}
	return results, status
}