in jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/EmbeddedJobExecutor.java [109:143]
private Uni<JobExecutionResponse> processJobDescription(JobDetails jobDetails, ProcessInstanceJobDescription processInstanceJobDescription) {
String timerId = processInstanceJobDescription.timerId();
String processInstanceId = processInstanceJobDescription.processInstanceId();
Optional<Process<? extends Model>> process = processes.get().processByProcessInstanceId(processInstanceId);
if (process.isEmpty()) {
return Uni.createFrom().item(
JobExecutionResponse.builder()
.code("401")
.jobId(jobDetails.getId())
.now()
.message("job does not belong to this container")
.build());
}
Integer limit = jobDetails.getRetries();
Supplier<Boolean> execute = () -> executeInUnitOfWork(application.unitOfWorkManager(), () -> {
TriggerJobCommand command = new TriggerJobCommand(processInstanceId, jobDetails.getCorrelationId(), timerId, limit, process.get(), application.unitOfWorkManager());
return command.execute();
});
return Uni.createFrom()
.item(execute)
.onFailure()
.transform(
unexpected -> new JobExecutionException(jobDetails.getId(), "Unexpected error when executing Embedded request for job: " + jobDetails.getId() + ". " + unexpected.getMessage(),
unexpected))
.onItem()
.transform(res -> JobExecutionResponse.builder()
.message("Embedded job executed")
.code(String.valueOf(200))
.now()
.jobId(jobDetails.getId())
.build());
}