public void gcPipelines()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/pipelines/PipelineManager.java [307:406]


  public void gcPipelines() {
    int removedCount = 0;
    int closedCount = 0;

    // only synchronized on the runningPipelineMap modification, because the actual pipeline to gc
    // can be
    // placed outside the lock
    synchronized (this) {
      // step 1: put pipelines that does not have running jobs into gcPipelineMap.
      Map<String, Pipeline> stillRunningPipelineMap = new HashMap<>();
      for (Map.Entry<String, Pipeline> entry : runningPipelineMap.entrySet()) {
        String pipelineId = entry.getKey();
        Pipeline pipeline = entry.getValue();
        // It is possible that when there is no expected job, job status cannot be changed to cancel
        // state, so we need to check
        // (1) all jobs are canceled
        // (2) there is no expected job.
        //
        // For example, in Kafka Consumer Proxy case, a processor might block the fetcher from
        // changing job status when the prefetch cache is full and the receivers are dead.
        if ((!pipeline.isRunning())
            || allJobsCanceled(pipeline.getJobStatus())
            || pipeline.getJobs().isEmpty()) {
          gcPipelineMap.put(pipeline, pipelineId);
        } else {
          stillRunningPipelineMap.put(pipelineId, pipeline);
        }
      }
      runningPipelineMap.clear();
      runningPipelineMap.putAll(stillRunningPipelineMap);
      stillRunningPipelineMap.clear();
    }

    // step 2: try to close pipelines and get stopped pipelines.
    Set<Pipeline> pipelineToDelete = new HashSet<>();
    for (Map.Entry<Pipeline, String> entry : gcPipelineMap.entrySet()) {
      String pipelineId = entry.getValue();
      Pipeline pipeline = entry.getKey();
      Stopwatch closeLatency =
          scope
              .tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, pipelineId))
              .timer(Metrics.CLOSE_LATENCY)
              .start();
      boolean closeFailure = false;

      // Keep stopping running pipelines until they are not running any more.
      // Do not stop and remove a pipeline immediately because if it is not stopped successfully, we
      // can never reach it again, and it still occupies resources.
      if (pipeline.isRunning()) {
        try {
          pipeline.stop();
        } catch (Exception e) {
          closeFailure = true;
          scope
              .tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, pipelineId))
              .counter(Metrics.CLOSE_FAILURE)
              .inc(1);
          logger.warn(Logging.CLOSE_FAILURE, StructuredLogging.pipelineId(pipelineId), e);
        } finally {
          closeLatency.stop();
          if (!closeFailure) {
            closedCount++;
            scope
                .tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, pipelineId))
                .counter(Metrics.CLOSE_SUCCESS)
                .inc(1);
            logger.debug(Logging.CLOSE_SUCCESS, StructuredLogging.pipelineId(pipelineId));
          }
        }
      }
      if (!pipeline.isRunning()) {
        pipelineToDelete.add(pipeline);
      }
    }

    // step 3: remove stopped pipelines
    for (Pipeline pipeline : pipelineToDelete) {
      gcPipelineMap.remove(pipeline);
      removedCount++;
    }
    pipelineToDelete.clear();

    logger.debug(
        Logging.REMOVE_SUCCESS,
        StructuredLogging.pipelineId("aggregated"),
        StructuredLogging.count(removedCount));
    scope
        .tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, "aggregated"))
        .counter(Metrics.REMOVE_SUCCESS)
        .inc(removedCount);

    logger.debug(
        Logging.CLOSE_SUCCESS,
        StructuredLogging.pipelineId("aggregated"),
        StructuredLogging.count(closedCount));
    scope
        .tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, "aggregated"))
        .counter(Metrics.CLOSE_SUCCESS)
        .inc(closedCount);
  }