func()

in pkg/controller/elasticsearch/nodespec/desired_nodes.go [130:215]


func (n nodeSetResourcesBuilder) withStorage(
	ctx context.Context,
	k8sClient k8s.Client,
	statefulSet appsv1.StatefulSet,
	config settings.CanonicalConfig,
	esContainer *corev1.Container,
) (nodeResources, error) {
	var p pathSetting
	if err := config.CanonicalConfig.Unpack(&p); err != nil {
		return nil, err
	}
	if p.PathData == nil {
		return nil, n.addReason("Elasticsearch path.data must be a set").toError()
	}
	pathData, ok := p.PathData.(string)
	if !ok {
		return nil, n.addReason("Elasticsearch path.data must be a string, multiple paths is not supported").toError()
	}

	var volumeName string
	for _, mount := range esContainer.VolumeMounts {
		if mount.MountPath == pathData {
			volumeName = mount.Name
			continue
		}
	}
	if len(volumeName) == 0 {
		return nil, n.addReason(fmt.Sprintf("Elasticsearch path.data %s must mounted by a volume", pathData)).toError()
	}

	var esDataVolumeClaim *corev1.PersistentVolumeClaim
	for _, pvc := range statefulSet.Spec.VolumeClaimTemplates {
		if pvc.Name == volumeName {
			pvc := pvc // return a pointer on a copy
			esDataVolumeClaim = &pvc
			continue
		}
	}

	if esDataVolumeClaim == nil {
		return nil, n.addReason(fmt.Sprintf("Volume claim with name %q not found in Spec.VolumeClaimTemplates ", volumeName)).toError()
	}

	claimedStorage := getClaimedStorage(*esDataVolumeClaim)
	if claimedStorage == nil {
		return nil, n.addReason(fmt.Sprintf("no storage request in claim %q", esDataVolumeClaim.Name)).toError()
	}

	// Stop here if there is at least one reason to not compute the desired state.
	if err := n.toError(); err != nil {
		return nil, err
	}

	nodeResources := make([]nodeResource, sset.GetReplicas(statefulSet))
	for i, podName := range sset.PodNames(statefulSet) {
		nodeResources[i].nodeName = podName
		nodeResources[i].cpu = n.cpu
		nodeResources[i].memory = n.memory
		pvcName := fmt.Sprintf("%s-%s", esDataVolumeClaim.Name, podName)
		pvc := corev1.PersistentVolumeClaim{}
		if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: statefulSet.Namespace, Name: pvcName}, &pvc); err != nil {
			if apierrors.IsNotFound(err) {
				// PVC does not exist (yet)
				nodeResources[i].requeue = true
				nodeResources[i].storage = *claimedStorage
				continue
			}
			return nil, err
		}
		// We first attempt to read the PVC status
		if storageInStatus, exists := pvc.Status.Capacity[corev1.ResourceStorage]; exists {
			nodeResources[i].storage = storageInStatus.Value()
			continue
		}
		// If there is no storage value in the status use the value in the spec
		nodeResources[i].requeue = true
		if storageInSpec, exists := pvc.Spec.Resources.Requests[corev1.ResourceStorage]; exists {
			nodeResources[i].storage = storageInSpec.Value()
		} else {
			// PVC does exist, but Spec.Resources.Requests is empty, this is unlikely to happen for a PVC, fall back to claimed storage.
			nodeResources[i].storage = *claimedStorage
		}
	}

	return nodeResources, nil
}