in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/pipelines/PipelineManager.java [307:406]
public void gcPipelines() {
int removedCount = 0;
int closedCount = 0;
// only synchronized on the runningPipelineMap modification, because the actual pipeline to gc
// can be
// placed outside the lock
synchronized (this) {
// step 1: put pipelines that does not have running jobs into gcPipelineMap.
Map<String, Pipeline> stillRunningPipelineMap = new HashMap<>();
for (Map.Entry<String, Pipeline> entry : runningPipelineMap.entrySet()) {
String pipelineId = entry.getKey();
Pipeline pipeline = entry.getValue();
// It is possible that when there is no expected job, job status cannot be changed to cancel
// state, so we need to check
// (1) all jobs are canceled
// (2) there is no expected job.
//
// For example, in Kafka Consumer Proxy case, a processor might block the fetcher from
// changing job status when the prefetch cache is full and the receivers are dead.
if ((!pipeline.isRunning())
|| allJobsCanceled(pipeline.getJobStatus())
|| pipeline.getJobs().isEmpty()) {
gcPipelineMap.put(pipeline, pipelineId);
} else {
stillRunningPipelineMap.put(pipelineId, pipeline);
}
}
runningPipelineMap.clear();
runningPipelineMap.putAll(stillRunningPipelineMap);
stillRunningPipelineMap.clear();
}
// step 2: try to close pipelines and get stopped pipelines.
Set<Pipeline> pipelineToDelete = new HashSet<>();
for (Map.Entry<Pipeline, String> entry : gcPipelineMap.entrySet()) {
String pipelineId = entry.getValue();
Pipeline pipeline = entry.getKey();
Stopwatch closeLatency =
scope
.tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, pipelineId))
.timer(Metrics.CLOSE_LATENCY)
.start();
boolean closeFailure = false;
// Keep stopping running pipelines until they are not running any more.
// Do not stop and remove a pipeline immediately because if it is not stopped successfully, we
// can never reach it again, and it still occupies resources.
if (pipeline.isRunning()) {
try {
pipeline.stop();
} catch (Exception e) {
closeFailure = true;
scope
.tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, pipelineId))
.counter(Metrics.CLOSE_FAILURE)
.inc(1);
logger.warn(Logging.CLOSE_FAILURE, StructuredLogging.pipelineId(pipelineId), e);
} finally {
closeLatency.stop();
if (!closeFailure) {
closedCount++;
scope
.tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, pipelineId))
.counter(Metrics.CLOSE_SUCCESS)
.inc(1);
logger.debug(Logging.CLOSE_SUCCESS, StructuredLogging.pipelineId(pipelineId));
}
}
}
if (!pipeline.isRunning()) {
pipelineToDelete.add(pipeline);
}
}
// step 3: remove stopped pipelines
for (Pipeline pipeline : pipelineToDelete) {
gcPipelineMap.remove(pipeline);
removedCount++;
}
pipelineToDelete.clear();
logger.debug(
Logging.REMOVE_SUCCESS,
StructuredLogging.pipelineId("aggregated"),
StructuredLogging.count(removedCount));
scope
.tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, "aggregated"))
.counter(Metrics.REMOVE_SUCCESS)
.inc(removedCount);
logger.debug(
Logging.CLOSE_SUCCESS,
StructuredLogging.pipelineId("aggregated"),
StructuredLogging.count(closedCount));
scope
.tagged(ImmutableMap.of(StructuredLogging.PIPELINE_ID, "aggregated"))
.counter(Metrics.CLOSE_SUCCESS)
.inc(closedCount);
}