in jobs-service/jobs-service-common/src/main/java/org/kie/kogito/jobs/service/management/JobServiceInstanceManager.java [146:173]
protected Uni<JobServiceManagementInfo> tryBecomeLeader(JobServiceManagementInfo info, TimeoutStream checkLeader, TimeoutStream heartbeat) {
LOGGER.debug("Try to become Leader");
return repository.getAndUpdate(info.getId(), c -> {
final OffsetDateTime currentTime = DateUtil.now().toOffsetDateTime();
if (Objects.isNull(c) || Objects.isNull(c.getToken()) || Objects.equals(c.getToken(), info.getToken()) || Objects.isNull(c.getLastHeartbeat())
|| c.getLastHeartbeat().isBefore(currentTime.minusSeconds(heartbeatExpirationInSeconds))) {
//old instance is not active
info.setLastHeartbeat(currentTime);
LOGGER.info("SET Leader {}", info);
leader.set(true);
enableCommunication();
heartbeat.resume();
checkLeader.pause();
return info;
} else {
if (isLeader()) {
LOGGER.info("Not Leader");
leader.set(false);
disableCommunication();
}
//stop heartbeats if running
heartbeat.pause();
//guarantee the stream is running if not leader
checkLeader.resume();
}
return null;
});
}