in bigtop-manager-server/src/main/java/org/apache/bigtop/manager/server/command/helper/JobCacheHelper.java [84:127]
public static void sendJobCache(Long jobId, List<String> hostnames) {
if (!INITIALIZED.get()) {
initialize();
}
JobCachePayload payload = new JobCachePayload();
genGlobalPayload(payload);
// Sort by cluster id to avoid regenerating the same cluster payload
List<HostPO> hostPOList = hostDao.findAllByHostnames(hostnames);
hostPOList.sort(Comparator.comparing(HostPO::getClusterId));
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (HostPO hostPO : hostPOList) {
genClusterPayload(payload, hostPO.getClusterId());
JobCacheRequest request = JobCacheRequest.newBuilder()
.setJobId(jobId)
.setPayload(JsonUtils.writeAsString(payload))
.build();
futures.add(CompletableFuture.supplyAsync(() -> {
JobCacheServiceGrpc.JobCacheServiceBlockingStub stub = GrpcClient.getBlockingStub(
hostPO.getHostname(),
hostPO.getGrpcPort(),
JobCacheServiceGrpc.JobCacheServiceBlockingStub.class);
JobCacheReply reply = stub.save(request);
return reply != null && reply.getCode() == MessageConstants.SUCCESS_CODE;
}));
}
List<Boolean> results = futures.stream()
.map((future) -> {
try {
return future.get();
} catch (Exception e) {
return false;
}
})
.toList();
boolean allSuccess = results.stream().allMatch(Boolean::booleanValue);
if (!allSuccess) {
throw new ServerException("Failed to send job cache");
}
}