in dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/eventbus/TaskExecutorEventBusCoordinator.java [131:197]
private void doFireTaskExecutorEventBus(final ITaskExecutor taskExecutor) {
try (final TaskExecutorMDCUtils.MDCAutoClosable ignored = TaskExecutorMDCUtils.logWithMDC(taskExecutor)) {
final TaskExecutorEventBus taskExecutorEventBus = taskExecutor.getTaskExecutorEventBus();
if (taskExecutorEventBus.isEmpty()) {
return;
}
Optional<AbstractTaskExecutorLifecycleEvent> headEventOptional = taskExecutorEventBus.poll();
if (!headEventOptional.isPresent()) {
return;
}
final ITaskExecutorLifecycleEvent taskExecutorLifecycleEvent = headEventOptional.get();
try {
for (final ITaskExecutorLifecycleEventListener taskExecutorLifecycleEventListener : taskExecutorLifecycleEventListeners) {
switch (taskExecutorLifecycleEvent.getType()) {
case DISPATCHED:
taskExecutorLifecycleEventListener.onTaskExecutorDispatchedLifecycleEvent(
((TaskExecutorDispatchedLifecycleEvent) taskExecutorLifecycleEvent));
break;
case RUNNING:
taskExecutorLifecycleEventListener.onTaskExecutorStartedLifecycleEvent(
((TaskExecutorStartedLifecycleEvent) taskExecutorLifecycleEvent));
break;
case RUNTIME_CONTEXT_CHANGE:
taskExecutorLifecycleEventListener.onTaskExecutorRuntimeContextChangedEvent(
((TaskExecutorRuntimeContextChangedLifecycleEvent) taskExecutorLifecycleEvent));
break;
case PAUSE:
taskExecutorLifecycleEventListener.onTaskExecutorPauseLifecycleEvent(
((TaskExecutorPauseLifecycleEvent) taskExecutorLifecycleEvent));
break;
case PAUSED:
taskExecutorLifecycleEventListener.onTaskExecutorPausedLifecycleEvent(
((TaskExecutorPausedLifecycleEvent) taskExecutorLifecycleEvent));
break;
case KILL:
taskExecutorLifecycleEventListener.onTaskExecutorKillLifecycleEvent(
((TaskExecutorKillLifecycleEvent) taskExecutorLifecycleEvent));
break;
case KILLED:
taskExecutorLifecycleEventListener.onTaskExecutorKilledLifecycleEvent(
((TaskExecutorKilledLifecycleEvent) taskExecutorLifecycleEvent));
break;
case SUCCESS:
taskExecutorLifecycleEventListener.onTaskExecutorSuccessLifecycleEvent(
((TaskExecutorSuccessLifecycleEvent) taskExecutorLifecycleEvent));
break;
case FAILED:
taskExecutorLifecycleEventListener.onTaskExecutorFailLifecycleEvent(
((TaskExecutorFailedLifecycleEvent) taskExecutorLifecycleEvent));
break;
case FINALIZE:
taskExecutorLifecycleEventListener.onTaskExecutorFinalizeLifecycleEvent(
((TaskExecutorFinalizeLifecycleEvent) taskExecutorLifecycleEvent));
break;
default:
throw new IllegalArgumentException(
"Unsupported TaskExecutorLifecycleEvent: " + taskExecutorLifecycleEvent);
}
}
log.info("Success fire {}: {} ",
taskExecutorLifecycleEvent.getClass().getSimpleName(),
JSONUtils.toPrettyJsonString(taskExecutorLifecycleEvent));
} catch (Exception e) {
log.error("Fire TaskExecutorLifecycleEvent: {} error", taskExecutorLifecycleEvent, e);
}
}
}