public void checkAndRestorePipelineElements()

in streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/health/PipelineHealthCheck.java [63:138]


  public void checkAndRestorePipelineElements() {

    List<Pipeline> allPipelines = getAllPipelines();
    List<Pipeline> runningPipelines = getRunningPipelines(allPipelines);

    pipelinesStats.clear();
    pipelinesStats.setAllPipelines(allPipelines.size());
    pipelinesStats.setRunningPipelines(runningPipelines.size());
    pipelinesStats.setStoppedPipelines(pipelinesStats.getAllPipelines() - pipelinesStats.getRunningPipelines());

    if (!runningPipelines.isEmpty()) {
      Map<String, List<InvocableStreamPipesEntity>> endpointMap = generateEndpointMap();
      List<String> allRunningInstances = findRunningInstances(endpointMap.keySet());

      runningPipelines.forEach(pipeline -> {
        AtomicBoolean shouldUpdatePipeline = new AtomicBoolean(false);
        List<String> failedInstances = new ArrayList<>();
        List<String> recoveredInstances = new ArrayList<>();
        List<String> pipelineNotifications = new ArrayList<>();
        List<InvocableStreamPipesEntity> graphs = RunningPipelineElementStorage
            .runningProcessorsAndSinks
            .get(pipeline.getPipelineId());

        graphs.forEach(graph -> {
          String instanceId = extractInstanceId(graph);
          if (allRunningInstances.stream().noneMatch(runningInstanceId -> runningInstanceId.equals(instanceId))) {
            if (shouldRetry(instanceId)) {
              String endpointUrl = graph.getSelectedEndpointUrl();
              shouldUpdatePipeline.set(true);
              boolean success;
              try {
                endpointUrl = findEndpointUrl(graph);
                success = new InvokeHttpRequest().execute(graph, endpointUrl, pipeline.getPipelineId()).isSuccess();
              } catch (NoServiceEndpointsAvailableException e) {
                success = false;
              }
              if (!success) {
                failedInstances.add(instanceId);
                addFailedAttemptNotification(pipelineNotifications, graph);
                increaseFailedAttempt(instanceId);
                LOG.info("Could not restore pipeline element {} of pipeline {} ({}/{})",
                    graph.getName(),
                    pipeline.getName(),
                    failedRestartAttempts.get(instanceId),
                    MAX_FAILED_ATTEMPTS);
              } else {
                recoveredInstances.add(instanceId);
                addSuccessfulRestoreNotification(pipelineNotifications, graph);
                resetFailedAttempts(instanceId);
                graph.setSelectedEndpointUrl(endpointUrl);
                LOG.info("Successfully restored pipeline element {} of pipeline {}", graph.getName(),
                    pipeline.getName());
              }
            }
          }
        });
        if (shouldUpdatePipeline.get()) {
          var currentPipeline = getPipeline(pipeline.getPipelineId());
          if (!failedInstances.isEmpty()) {
            currentPipeline.setHealthStatus(PipelineHealthStatus.FAILURE);
            pipelinesStats.failedIncrease();
          } else if (!recoveredInstances.isEmpty()) {
            currentPipeline.setHealthStatus(PipelineHealthStatus.REQUIRES_ATTENTION);
            pipelinesStats.attentionRequiredIncrease();
          }
          currentPipeline.setPipelineNotifications(pipelineNotifications);
          StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineStorageAPI().updateElement(currentPipeline);
        }
      });
      int healthNum = pipelinesStats.getRunningPipelines() - pipelinesStats.getFailedPipelines()
          - pipelinesStats.getAttentionRequiredPipelines();
      pipelinesStats.setHealthyPipelines(healthNum);
      pipelinesStats.setElementCount(getElementsCount(allPipelines));
    }
    pipelinesStats.metrics();
  }