public static void sendJobCache()

in bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java [84:127]


    public static void sendJobCache(Long jobId, List<String> hostnames) {
        if (!INITIALIZED.get()) {
            initialize();
        }

        JobCachePayload payload = new JobCachePayload();
        genGlobalPayload(payload);

        // Sort by cluster id to avoid regenerating the same cluster payload
        List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
        hostPOList.sort(Comparator.comparing(HostPO::getClusterId));

        List<CompletableFuture<Boolean>> futures = new ArrayList<>();
        for (HostPO hostPO : hostPOList) {
            genClusterPayload(payload, hostPO.getClusterId());
            JobCacheRequest request = JobCacheRequest.newBuilder()
                    .setJobId(jobId)
                    .setPayload(JsonUtils.writeAsString(payload))
                    .build();
            futures.add(CompletableFuture.supplyAsync(() -> {
                JobCacheServiceGrpc.JobCacheServiceBlockingStub stub = GrpcClient.getBlockingStub(
                        hostPO.getHostname(),
                        hostPO.getGrpcPort(),
                        JobCacheServiceGrpc.JobCacheServiceBlockingStub.class);
                JobCacheReply reply = stub.save(request);
                return reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE;
            }));
        }

        List<Boolean> results = futures.stream()
                .map((future) -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        return false;
                    }
                })
                .toList();

        boolean allSuccess = results.stream().allMatch(Boolean::booleanValue);
        if (!allSuccess) {
            throw new ServerException("Failed to send job cache");
        }
    }