in iotdb-operator/internal/controller/datanode_controller.go [123:241]
func (r *DataNodeReconciler) constructStateFulSetForDataNode(dataNode *iotdbv1.DataNode) *appsv1.StatefulSet {
labels := map[string]string{"app": DataNodeName}
replicas := int32(dataNode.Spec.Replicas)
envVars := make([]corev1.EnvVar, 3)
envNum := 0
if dataNode.Spec.Envs != nil {
envNum = len(dataNode.Spec.Envs)
envVars = make([]corev1.EnvVar, len(dataNode.Spec.Envs)+3)
i := 0
for key, value := range dataNode.Spec.Envs {
if key == "dn_rpc_port" {
value = "6667"
} else if key == "dn_internal_port" {
value = "10730"
} else if key == "dn_mpp_data_exchange_port" {
value = "10740"
} else if key == "dn_schema_region_consensus_port" {
value = "10750"
} else if key == "dn_data_region_consensus_port" {
value = "10760"
} else if key == "dn_metric_prometheus_reporter_port" {
value = "9092"
} else if key == "rest_service_port" {
value = "18080"
}
envVars[i] = corev1.EnvVar{Name: key, Value: value}
i++
}
}
envVars[envNum] = corev1.EnvVar{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
}
val1 := ConfigNodeName + "-0." + ConfigNodeName + "-headless." + dataNode.Namespace + ".svc.cluster.local:10710"
val2 := "$(POD_NAME)." + DataNodeName + "-headless." + dataNode.Namespace + ".svc.cluster.local"
envVars[envNum+1] = corev1.EnvVar{Name: "dn_seed_config_node", Value: val1}
envVars[envNum+2] = corev1.EnvVar{Name: "dn_internal_address", Value: val2}
pvcTemplate := *r.constructPVCForDataNode(dataNode)
pvcName := pvcTemplate.Name
statefulset := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: DataNodeName,
Namespace: dataNode.Namespace,
Labels: labels,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
ServiceName: DataNodeName + "-headless",
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Affinity: &corev1.Affinity{
PodAntiAffinity: &corev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{
{
LabelSelector: &metav1.LabelSelector{
MatchLabels: labels,
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
Containers: []corev1.Container{
{
Name: DataNodeName,
Image: dataNode.Spec.Image,
ImagePullPolicy: corev1.PullIfNotPresent,
Ports: []corev1.ContainerPort{
{Name: "rpc-port", ContainerPort: 6667},
{Name: "internal-port", ContainerPort: 10730},
{Name: "exchange-port", ContainerPort: 10740},
{Name: "schema-port", ContainerPort: 10750},
{Name: "data-port", ContainerPort: 10760},
{Name: "rest-port", ContainerPort: 18080},
{Name: "metric-port", ContainerPort: 9092},
},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: *dataNode.Spec.Resources.Limits.Cpu(),
corev1.ResourceMemory: *dataNode.Spec.Resources.Limits.Memory(),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: *dataNode.Spec.Resources.Limits.Cpu(),
corev1.ResourceMemory: *dataNode.Spec.Resources.Limits.Memory(),
},
},
Env: envVars,
VolumeMounts: []corev1.VolumeMount{
{Name: pvcName, MountPath: "/iotdb/data", SubPath: "data"},
{Name: pvcName, MountPath: "/iotdb/logs", SubPath: "logs"},
{Name: pvcName, MountPath: "/iotdb/ext", SubPath: "ext"},
{Name: pvcName, MountPath: "/iotdb/.env", SubPath: ".env"},
{Name: pvcName, MountPath: "/iotdb/activation", SubPath: "activation"},
},
},
},
},
},
VolumeClaimTemplates: []corev1.PersistentVolumeClaim{pvcTemplate},
},
}
err := SetControllerReference(dataNode, statefulset, r.Scheme)
if err != nil {
return nil
}
return statefulset
}