public void observe()

in jobs-service/kogito-addons-jobs-service/kogito-addons-quarkus-jobs/src/main/java/org/kie/kogito/jobs/embedded/JobInVMEventPublisher.java [92:128]


    public void observe(@ObservesAsync EmbeddedJobServiceEvent serviceEvent) {
        JobDetails jobDetails = serviceEvent.getJobDetails();
        LOGGER.debug("Emmit in-vm publishJobStatusChange {}", jobDetails);
        try {
            ScheduledJob scheduledJob = ScheduledJobAdapter.of(jobDetails);
            Recipient<InVMPayloadData> recipient = jobDetails.getRecipient().getRecipient();
            JobDescription jobDescription = recipient.getPayload().getJobDescription();

            if (jobDescription instanceof ProcessInstanceJobDescription processInstanceJobDescription) {
                scheduledJob.setProcessInstanceId(processInstanceJobDescription.processInstanceId());
                scheduledJob.setProcessId(processInstanceJobDescription.processId());
                scheduledJob.setRootProcessInstanceId(processInstanceJobDescription.rootProcessInstanceId());
                scheduledJob.setRootProcessId(processInstanceJobDescription.rootProcessId());
                scheduledJob.setNodeInstanceId(processInstanceJobDescription.nodeInstanceId());
            } else if (jobDescription instanceof UserTaskInstanceJobDescription userTaskInstanceJobDescription) {
                scheduledJob.setProcessInstanceId(userTaskInstanceJobDescription.processInstanceId());
                scheduledJob.setProcessId(userTaskInstanceJobDescription.processId());
                scheduledJob.setNodeInstanceId(userTaskInstanceJobDescription.nodeInstanceId());
                scheduledJob.setRootProcessInstanceId(userTaskInstanceJobDescription.rootProcessInstanceId());
                scheduledJob.setRootProcessId(userTaskInstanceJobDescription.rootProcessId());
            }

            byte[] jsonContent = objectMapper.writeValueAsBytes(scheduledJob);
            JobInstanceDataEvent event = new JobInstanceDataEvent(JOB_EVENT_TYPE,
                    url + RestApiConstants.JOBS_PATH,
                    jsonContent,
                    scheduledJob.getProcessInstanceId(),
                    scheduledJob.getRootProcessInstanceId(),
                    scheduledJob.getProcessId(),
                    scheduledJob.getRootProcessId(),
                    null);

            eventPublishers.forEach(e -> e.publish(event));
        } catch (Exception e) {
            LOGGER.error("Job status change propagation has failed at eventPublisher: " + eventPublishers.getClass() + " execution.", e);
        }
    }