in streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java [63:138]
public void checkAndRestorePipelineElements() {
List<Pipeline> allPipelines = getAllPipelines();
List<Pipeline> runningPipelines = getRunningPipelines(allPipelines);
pipelinesStats.clear();
pipelinesStats.setAllPipelines(allPipelines.size());
pipelinesStats.setRunningPipelines(runningPipelines.size());
pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines() - pipelinesStats.getRunningPipelines());
if (!runningPipelines.isEmpty()) {
Map<String, List<InvocableStreamPipesEntity>> endpointMap = generateEndpointMap();
List<String> allRunningInstances = findRunningInstances(endpointMap.keySet());
runningPipelines.forEach(pipeline -> {
AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
List<String> failedInstances = new ArrayList<>();
List<String> recoveredInstances = new ArrayList<>();
List<String> pipelineNotifications = new ArrayList<>();
List<InvocableStreamPipesEntity> graphs = RunningPipelineElementStorage
.runningProcessorsAndSinks
.get(pipeline.getPipelineId());
graphs.forEach(graph -> {
String instanceId = extractInstanceId(graph);
if (allRunningInstances.stream().noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) {
if (shouldRetry(instanceId)) {
String endpointUrl = graph.getSelectedEndpointUrl();
shouldUpdatePipeline.set(true);
boolean success;
try {
endpointUrl = findEndpointUrl(graph);
success = new InvokeHttpRequest().execute(graph, endpointUrl, pipeline.getPipelineId()).isSuccess();
} catch (NoServiceEndpointsAvailableException e) {
success = false;
}
if (!success) {
failedInstances.add(instanceId);
addFailedAttemptNotification(pipelineNotifications, graph);
increaseFailedAttempt(instanceId);
LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})",
graph.getName(),
pipeline.getName(),
failedRestartAttempts.get(instanceId),
MAX_FAILED_ATTEMPTS);
} else {
recoveredInstances.add(instanceId);
addSuccessfulRestoreNotification(pipelineNotifications, graph);
resetFailedAttempts(instanceId);
graph.setSelectedEndpointUrl(endpointUrl);
LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(),
pipeline.getName());
}
}
}
});
if (shouldUpdatePipeline.get()) {
var currentPipeline = getPipeline(pipeline.getPipelineId());
if (!failedInstances.isEmpty()) {
currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
pipelinesStats.failedIncrease();
} else if (!recoveredInstances.isEmpty()) {
currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
pipelinesStats.attentionRequiredIncrease();
}
currentPipeline.setPipelineNotifications(pipelineNotifications);
StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updateElement(currentPipeline);
}
});
int healthNum = pipelinesStats.getRunningPipelines() - pipelinesStats.getFailedPipelines()
- pipelinesStats.getAttentionRequiredPipelines();
pipelinesStats.setHealthyPipelines(healthNum);
pipelinesStats.setElementCount(getElementsCount(allPipelines));
}
pipelinesStats.metrics();
}