in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/PipelineFactoryImpl.java [60:110]
public Pipeline createPipeline(String pipelineId, Job job) {
Optional<KafkaFetcher<byte[], byte[]>> fetcher = Optional.empty();
Optional<ProcessorImpl> processor = Optional.empty();
Optional<DispatcherImpl> dispatcher = Optional.empty();
Optional<GrpcDispatcher> grpcDispatcher = Optional.empty();
Optional<PipelineStateManager> kafkaPipelineStateManager = Optional.empty();
try {
kafkaPipelineStateManager = Optional.of(new KafkaPipelineStateManager(job, infra.scope()));
boolean isSecure = job.hasSecurityConfig() && job.getSecurityConfig().getIsSecure();
fetcher =
Optional.of(
kafkaFetcherFactory.create(job, getThreadName(job, CONSUMER, serviceName), infra));
String processorId = getThreadName(job, PROCESSOR, serviceName);
processor = Optional.of(processorFactory.create(job, processorId));
grpcDispatcher =
Optional.of(
grpcDispatcherFactory.create(
serviceName,
job.getRpcDispatcherTask().getUri(),
job.getRpcDispatcherTask().getProcedure()));
final String clientId = getThreadName(job, PRODUCER, serviceName);
Optional<KafkaDispatcher<byte[], byte[]>> resqKafkaProducer =
RetryUtils.hasResqTopic(job)
? Optional.of(
kafkaDispatcherFactory.create(
clientId, job.getResqConfig().getResqCluster(), infra, isSecure, true))
: Optional.empty();
dispatcher =
Optional.of(
new DispatcherImpl(
infra,
grpcDispatcher.get(),
// DLQ always use lossless producer
kafkaDispatcherFactory.create(clientId, DLQ, infra, isSecure, false),
resqKafkaProducer));
return new PipelineImpl(
pipelineId,
fetcher.get(),
processor.get(),
dispatcher.get(),
kafkaPipelineStateManager.get());
} catch (Exception e) {
LOGGER.info("failed to create pipeline", e);
kafkaPipelineStateManager.ifPresent(pipelineStateManager -> pipelineStateManager.cancel(job));
fetcher.ifPresent(KafkaFetcher::stop);
processor.ifPresent(ProcessorImpl::stop);
grpcDispatcher.ifPresent(GrpcDispatcher::stop);
dispatcher.ifPresent(DispatcherImpl::stop);
throw new RuntimeException(e);
}
}