in kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/failover/FailoverListenerManager.java [90:110]
public void onChange(final DataChangedEvent event) {
if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.DELETED == event.getType() && instanceNode.isInstancePath(event.getKey())) {
String jobInstanceId = event.getKey().substring(instanceNode.getInstanceFullPath().length() + 1);
if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
return;
}
List<Integer> failoverItems = failoverService.getFailoveringItems(jobInstanceId);
if (!failoverItems.isEmpty()) {
for (int each : failoverItems) {
failoverService.setCrashedFailoverFlagDirectly(each);
executionService.clearRunningInfo(Collections.singletonList(each));
failoverService.failoverIfNecessary();
}
} else {
for (int each : shardingService.getCrashedShardingItems(jobInstanceId)) {
failoverService.setCrashedFailoverFlag(each);
failoverService.failoverIfNecessary();
}
}
}
}