in kernel/src/main/java/org/apache/shardingsphere/elasticjob/kernel/internal/failover/FailoverListenerManager.java [126:155]
public void onChange(final DataChangedEvent event) {
if (!isCurrentInstanceOnline(event) || !isFailoverEnabled()) {
return;
}
Set<JobInstance> availableJobInstances = new HashSet<>(instanceService.getAvailableJobInstances());
if (!isTheOnlyInstance(availableJobInstances)) {
return;
}
Map<Integer, JobInstance> allRunningItems = executionService.getAllRunningItems();
Map<Integer, JobInstance> allFailoveringItems = failoverService.getAllFailoveringItems();
if (allRunningItems.isEmpty() && allFailoveringItems.isEmpty()) {
return;
}
for (Entry<Integer, JobInstance> entry : allFailoveringItems.entrySet()) {
if (!availableJobInstances.contains(entry.getValue())) {
int item = entry.getKey();
failoverService.setCrashedFailoverFlagDirectly(item);
failoverService.clearFailoveringItem(item);
executionService.clearRunningInfo(Collections.singletonList(item));
allRunningItems.remove(item);
}
}
for (Entry<Integer, JobInstance> entry : allRunningItems.entrySet()) {
if (!availableJobInstances.contains(entry.getValue())) {
failoverService.setCrashedFailoverFlag(entry.getKey());
executionService.clearRunningInfo(Collections.singletonList(entry.getKey()));
}
}
failoverService.failoverIfNecessary();
}