func()

in iotdb-operator/internal/controller/datanode_controller.go [123:241]


func (r *DataNodeReconciler) constructStateFulSetForDataNode(dataNode *iotdbv1.DataNode) *appsv1.StatefulSet {
	labels := map[string]string{"app": DataNodeName}
	replicas := int32(dataNode.Spec.Replicas)
	envVars := make([]corev1.EnvVar, 3)
	envNum := 0
	if dataNode.Spec.Envs != nil {
		envNum = len(dataNode.Spec.Envs)
		envVars = make([]corev1.EnvVar, len(dataNode.Spec.Envs)+3)
		i := 0
		for key, value := range dataNode.Spec.Envs {
			if key == "dn_rpc_port" {
				value = "6667"
			} else if key == "dn_internal_port" {
				value = "10730"
			} else if key == "dn_mpp_data_exchange_port" {
				value = "10740"
			} else if key == "dn_schema_region_consensus_port" {
				value = "10750"
			} else if key == "dn_data_region_consensus_port" {
				value = "10760"
			} else if key == "dn_metric_prometheus_reporter_port" {
				value = "9092"
			} else if key == "rest_service_port" {
				value = "18080"
			}
			envVars[i] = corev1.EnvVar{Name: key, Value: value}
			i++
		}
	}

	envVars[envNum] = corev1.EnvVar{
		Name: "POD_NAME",
		ValueFrom: &corev1.EnvVarSource{
			FieldRef: &corev1.ObjectFieldSelector{
				FieldPath: "metadata.name",
			},
		},
	}
	val1 := ConfigNodeName + "-0." + ConfigNodeName + "-headless." + dataNode.Namespace + ".svc.cluster.local:10710"
	val2 := "$(POD_NAME)." + DataNodeName + "-headless." + dataNode.Namespace + ".svc.cluster.local"
	envVars[envNum+1] = corev1.EnvVar{Name: "dn_seed_config_node", Value: val1}
	envVars[envNum+2] = corev1.EnvVar{Name: "dn_internal_address", Value: val2}

	pvcTemplate := *r.constructPVCForDataNode(dataNode)
	pvcName := pvcTemplate.Name
	statefulset := &appsv1.StatefulSet{
		ObjectMeta: metav1.ObjectMeta{
			Name:      DataNodeName,
			Namespace: dataNode.Namespace,
			Labels:    labels,
		},
		Spec: appsv1.StatefulSetSpec{
			Replicas: &replicas,
			Selector: &metav1.LabelSelector{
				MatchLabels: labels,
			},
			ServiceName: DataNodeName + "-headless",
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{
					Labels: labels,
				},
				Spec: corev1.PodSpec{
					Affinity: &corev1.Affinity{
						PodAntiAffinity: &corev1.PodAntiAffinity{
							RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
								{
									LabelSelector: &metav1.LabelSelector{
										MatchLabels: labels,
									},
									TopologyKey: "kubernetes.io/hostname",
								},
							},
						},
					},
					Containers: []corev1.Container{
						{
							Name:            DataNodeName,
							Image:           dataNode.Spec.Image,
							ImagePullPolicy: corev1.PullIfNotPresent,
							Ports: []corev1.ContainerPort{
								{Name: "rpc-port", ContainerPort: 6667},
								{Name: "internal-port", ContainerPort: 10730},
								{Name: "exchange-port", ContainerPort: 10740},
								{Name: "schema-port", ContainerPort: 10750},
								{Name: "data-port", ContainerPort: 10760},
								{Name: "rest-port", ContainerPort: 18080},
								{Name: "metric-port", ContainerPort: 9092},
							},
							Resources: corev1.ResourceRequirements{
								Limits: corev1.ResourceList{
									corev1.ResourceCPU:    *dataNode.Spec.Resources.Limits.Cpu(),
									corev1.ResourceMemory: *dataNode.Spec.Resources.Limits.Memory(),
								},
								Requests: corev1.ResourceList{
									corev1.ResourceCPU:    *dataNode.Spec.Resources.Limits.Cpu(),
									corev1.ResourceMemory: *dataNode.Spec.Resources.Limits.Memory(),
								},
							},
							Env: envVars,
							VolumeMounts: []corev1.VolumeMount{
								{Name: pvcName, MountPath: "/iotdb/data", SubPath: "data"},
								{Name: pvcName, MountPath: "/iotdb/logs", SubPath: "logs"},
								{Name: pvcName, MountPath: "/iotdb/ext", SubPath: "ext"},
								{Name: pvcName, MountPath: "/iotdb/.env", SubPath: ".env"},
								{Name: pvcName, MountPath: "/iotdb/activation", SubPath: "activation"},
							},
						},
					},
				},
			},
			VolumeClaimTemplates: []corev1.PersistentVolumeClaim{pvcTemplate},
		},
	}
	err := SetControllerReference(dataNode, statefulset, r.Scheme)
	if err != nil {
		return nil
	}
	return statefulset
}