in server/src/main/java/org/eclipse/jifa/server/service/impl/K8SWorkerScheduler.java [77:181]
public void scheduleAsync(long identity, long requestedMemSize, BiConsumer<String, Throwable> callback) {
Validate.isTrue(isMaster());
new Thread(() -> {
String hostAddress;
try {
V1Volume volume = new V1Volume();
volume.setName("jifa-pv");
volume.persistentVolumeClaim(new V1PersistentVolumeClaimVolumeSource().claimName(config.getStoragePVCName()));
String podName = buildPodUniqueName(identity);
V1Pod pod = new V1Pod();
pod.metadata(new V1ObjectMeta().name(podName));
V1ResourceRequirements resourceRequirements = new V1ResourceRequirements()
.requests(Map.of("memory", new Quantity(String.valueOf(requestedMemSize))));
V1Probe healthCheck = new V1Probe();
healthCheck.httpGet(new V1HTTPGetAction().path(HTTP_API_PREFIX + HTTP_HEALTH_CHECK_MAPPING)
.port(new IntOrString(DEFAULT_PORT)))
.initialDelaySeconds(5)
.periodSeconds(2)
.failureThreshold(30);
V1Container container = new V1Container()
.name(WORKER_CONTAINER_NAME)
.image(config.getElasticWorkerImage())
.imagePullPolicy("Always")
.addVolumeMountsItem(new V1VolumeMount().name("jifa-pv").mountPath(config.getStoragePath().toString()))
.addEnvItem(new V1EnvVar().name(ELASTIC_WORKER_IDENTITY_ENV_KEY).value(Long.toString(identity)))
.addEnvItem(new V1EnvVar().name("MYSQL_HOST").value(config.getDatabaseHost()))
.addEnvItem(new V1EnvVar().name("MYSQL_DATABASE").value(config.getDatabaseName()))
.addEnvItem(new V1EnvVar().name("MYSQL_USER").value(config.getDatabaseUser()))
.addEnvItem(new V1EnvVar().name("MYSQL_PASSWORD").value(config.getDatabasePassword()))
.args(List.of(
"--jifa.role=elastic-worker",
"--jifa.storage-path=" + config.getStoragePath().toString(),
"--jifa.port=" + config.getElasticWorkerPort(),
"--jifa.elastic-worker-idle-threshold=" + config.getElasticWorkerIdleThreshold(),
"--jifa.cluster-namespace=" + config.getClusterNamespace()))
.addPortsItem(new V1ContainerPort().containerPort(config.getElasticWorkerPort()))
.resources(resourceRequirements)
.startupProbe(healthCheck);
String jvmOptions = config.getElasticWorkerJVMOptions();
if (StringUtils.isNotBlank(jvmOptions)) {
container.addEnvItem(new V1EnvVar().name("JAVA_TOOL_OPTIONS").value(jvmOptions));
}
V1PodSpec podSpec = new V1PodSpec().addContainersItem(container).addVolumesItem(volume)
.serviceAccountName(config.getServiceAccountName())
.restartPolicy("Never");
String imagePullSecretName = config.getImagePullSecretName();
if (StringUtils.isNotBlank(imagePullSecretName)) {
podSpec.addImagePullSecretsItem(new V1LocalObjectReference()
.name(imagePullSecretName));
}
// workaround for https://github.com/kubernetes-client/java/issues/3076
podSpec.setOverhead(null);
pod.spec(podSpec);
api.createNamespacedPod(config.getClusterNamespace(), pod).execute();
while (true) {
pod = api.readNamespacedPod(podName, config.getClusterNamespace()).execute();
V1PodStatus status = pod.getStatus();
String podIP = status != null ? status.getPodIP() : null;
if (podIP != null) {
hostAddress = podIP;
break;
}
}
outerLoop:
while (true) {
V1PodStatus status = pod.getStatus();
if (status != null) {
List<V1ContainerStatus> containerStatuses = status.getContainerStatuses();
if (containerStatuses != null) {
for (V1ContainerStatus containerStatus : containerStatuses) {
if (WORKER_CONTAINER_NAME.equals(containerStatus.getName())) {
if (containerStatus.getReady()) {
break outerLoop;
}
}
}
}
}
pod = api.readNamespacedPod(podName, config.getClusterNamespace()).execute();
}
} catch (Throwable t) {
if (t instanceof ApiException apiException) {
log.error("Failed to start elastic worker, response body: {}", apiException.getResponseBody());
} else {
log.error("Failed to start elastic worker", t);
}
callback.accept(null, t);
return;
}
callback.accept(hostAddress, null);
}, "Elastic worker - " + identity + " Starter").start();
}