public void scheduleAsync()

in server/src/main/java/org/eclipse/jifa/server/service/impl/K8SWorkerScheduler.java [77:181]


    public void scheduleAsync(long identity, long requestedMemSize, BiConsumer<String, Throwable> callback) {
        Validate.isTrue(isMaster());
        new Thread(() -> {
            String hostAddress;
            try {
                V1Volume volume = new V1Volume();
                volume.setName("jifa-pv");
                volume.persistentVolumeClaim(new V1PersistentVolumeClaimVolumeSource().claimName(config.getStoragePVCName()));

                String podName = buildPodUniqueName(identity);

                V1Pod pod = new V1Pod();
                pod.metadata(new V1ObjectMeta().name(podName));

                V1ResourceRequirements resourceRequirements = new V1ResourceRequirements()
                        .requests(Map.of("memory", new Quantity(String.valueOf(requestedMemSize))));

                V1Probe healthCheck = new V1Probe();
                healthCheck.httpGet(new V1HTTPGetAction().path(HTTP_API_PREFIX + HTTP_HEALTH_CHECK_MAPPING)
                                                         .port(new IntOrString(DEFAULT_PORT)))
                           .initialDelaySeconds(5)
                           .periodSeconds(2)
                           .failureThreshold(30);

                V1Container container = new V1Container()
                        .name(WORKER_CONTAINER_NAME)
                        .image(config.getElasticWorkerImage())
                        .imagePullPolicy("Always")
                        .addVolumeMountsItem(new V1VolumeMount().name("jifa-pv").mountPath(config.getStoragePath().toString()))
                        .addEnvItem(new V1EnvVar().name(ELASTIC_WORKER_IDENTITY_ENV_KEY).value(Long.toString(identity)))
                        .addEnvItem(new V1EnvVar().name("MYSQL_HOST").value(config.getDatabaseHost()))
                        .addEnvItem(new V1EnvVar().name("MYSQL_DATABASE").value(config.getDatabaseName()))
                        .addEnvItem(new V1EnvVar().name("MYSQL_USER").value(config.getDatabaseUser()))
                        .addEnvItem(new V1EnvVar().name("MYSQL_PASSWORD").value(config.getDatabasePassword()))
                        .args(List.of(
                                "--jifa.role=elastic-worker",
                                "--jifa.storage-path=" + config.getStoragePath().toString(),
                                "--jifa.port=" + config.getElasticWorkerPort(),
                                "--jifa.elastic-worker-idle-threshold=" + config.getElasticWorkerIdleThreshold(),
                                "--jifa.cluster-namespace=" + config.getClusterNamespace()))
                        .addPortsItem(new V1ContainerPort().containerPort(config.getElasticWorkerPort()))
                        .resources(resourceRequirements)
                        .startupProbe(healthCheck);

                String jvmOptions = config.getElasticWorkerJVMOptions();
                if (StringUtils.isNotBlank(jvmOptions)) {
                    container.addEnvItem(new V1EnvVar().name("JAVA_TOOL_OPTIONS").value(jvmOptions));
                }

                V1PodSpec podSpec = new V1PodSpec().addContainersItem(container).addVolumesItem(volume)
                                                   .serviceAccountName(config.getServiceAccountName())
                                                   .restartPolicy("Never");

                String imagePullSecretName = config.getImagePullSecretName();
                if (StringUtils.isNotBlank(imagePullSecretName)) {
                    podSpec.addImagePullSecretsItem(new V1LocalObjectReference()
                            .name(imagePullSecretName));
                }

                // workaround for https://github.com/kubernetes-client/java/issues/3076
                podSpec.setOverhead(null);
                pod.spec(podSpec);

                api.createNamespacedPod(config.getClusterNamespace(), pod).execute();

                while (true) {
                    pod = api.readNamespacedPod(podName, config.getClusterNamespace()).execute();
                    V1PodStatus status = pod.getStatus();
                    String podIP = status != null ? status.getPodIP() : null;
                    if (podIP != null) {
                        hostAddress = podIP;
                        break;
                    }
                }

                outerLoop:
                while (true) {
                    V1PodStatus status = pod.getStatus();
                    if (status != null) {
                        List<V1ContainerStatus> containerStatuses = status.getContainerStatuses();
                        if (containerStatuses != null) {
                            for (V1ContainerStatus containerStatus : containerStatuses) {
                                if (WORKER_CONTAINER_NAME.equals(containerStatus.getName())) {
                                    if (containerStatus.getReady()) {
                                        break outerLoop;
                                    }
                                }
                            }
                        }
                    }
                    pod = api.readNamespacedPod(podName, config.getClusterNamespace()).execute();
                }
            } catch (Throwable t) {
                if (t instanceof ApiException apiException) {
                    log.error("Failed to start elastic worker, response body: {}", apiException.getResponseBody());
                } else {
                    log.error("Failed to start elastic worker", t);
                }
                callback.accept(null, t);
                return;
            }

            callback.accept(hostAddress, null);
        }, "Elastic worker - " + identity + " Starter").start();
    }