in jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs-service-embedded/runtime/src/main/java/org/kie/kogito/addons/quarkus/jobs/service/embedded/stream/EventPublisherJobStreams.java [79:112]
public JobDetails publishJobStatusChange(JobDetails jobDetails) {
try {
managedExecutor.runAsync(() -> {
if (eventPublisher != null) {
ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
byte[] jsonContent;
try {
jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
} catch (Exception e) {
throw new JobsServiceException("It was not possible to serialize scheduledJob to json: " + scheduledJob, e);
}
JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE,
url + RestApiConstants.JOBS_PATH,
jsonContent,
scheduledJob.getProcessInstanceId(),
scheduledJob.getRootProcessInstanceId(),
scheduledJob.getProcessId(),
scheduledJob.getRootProcessId(),
null);
try {
eventPublisher.forEach(e -> e.publish(event));
} catch (Exception e) {
LOGGER.error("Job status change propagation has failed at eventPublisher: " + eventPublisher.getClass() + " execution.", e);
}
}
}).get();
} catch (InterruptedException | ExecutionException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException("Job status change propagation has failed.", e);
}
return jobDetails;
}