in pkg/controller/elasticsearch/nodespec/desired_nodes.go [130:215]
func (n nodeSetResourcesBuilder) withStorage(
ctx context.Context,
k8sClient k8s.Client,
statefulSet appsv1.StatefulSet,
config settings.CanonicalConfig,
esContainer *corev1.Container,
) (nodeResources, error) {
var p pathSetting
if err := config.CanonicalConfig.Unpack(&p); err != nil {
return nil, err
}
if p.PathData == nil {
return nil, n.addReason("Elasticsearch path.data must be a set").toError()
}
pathData, ok := p.PathData.(string)
if !ok {
return nil, n.addReason("Elasticsearch path.data must be a string, multiple paths is not supported").toError()
}
var volumeName string
for _, mount := range esContainer.VolumeMounts {
if mount.MountPath == pathData {
volumeName = mount.Name
continue
}
}
if len(volumeName) == 0 {
return nil, n.addReason(fmt.Sprintf("Elasticsearch path.data %s must mounted by a volume", pathData)).toError()
}
var esDataVolumeClaim *corev1.PersistentVolumeClaim
for _, pvc := range statefulSet.Spec.VolumeClaimTemplates {
if pvc.Name == volumeName {
pvc := pvc // return a pointer on a copy
esDataVolumeClaim = &pvc
continue
}
}
if esDataVolumeClaim == nil {
return nil, n.addReason(fmt.Sprintf("Volume claim with name %q not found in Spec.VolumeClaimTemplates ", volumeName)).toError()
}
claimedStorage := getClaimedStorage(*esDataVolumeClaim)
if claimedStorage == nil {
return nil, n.addReason(fmt.Sprintf("no storage request in claim %q", esDataVolumeClaim.Name)).toError()
}
// Stop here if there is at least one reason to not compute the desired state.
if err := n.toError(); err != nil {
return nil, err
}
nodeResources := make([]nodeResource, sset.GetReplicas(statefulSet))
for i, podName := range sset.PodNames(statefulSet) {
nodeResources[i].nodeName = podName
nodeResources[i].cpu = n.cpu
nodeResources[i].memory = n.memory
pvcName := fmt.Sprintf("%s-%s", esDataVolumeClaim.Name, podName)
pvc := corev1.PersistentVolumeClaim{}
if err := k8sClient.Get(ctx, types.NamespacedName{Namespace: statefulSet.Namespace, Name: pvcName}, &pvc); err != nil {
if apierrors.IsNotFound(err) {
// PVC does not exist (yet)
nodeResources[i].requeue = true
nodeResources[i].storage = *claimedStorage
continue
}
return nil, err
}
// We first attempt to read the PVC status
if storageInStatus, exists := pvc.Status.Capacity[corev1.ResourceStorage]; exists {
nodeResources[i].storage = storageInStatus.Value()
continue
}
// If there is no storage value in the status use the value in the spec
nodeResources[i].requeue = true
if storageInSpec, exists := pvc.Spec.Resources.Requests[corev1.ResourceStorage]; exists {
nodeResources[i].storage = storageInSpec.Value()
} else {
// PVC does exist, but Spec.Resources.Requests is empty, this is unlikely to happen for a PVC, fall back to claimed storage.
nodeResources[i].storage = *claimedStorage
}
}
return nodeResources, nil
}