public Pipeline createPipeline()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/PipelineFactoryImpl.java [60:110]


  public Pipeline createPipeline(String pipelineId, Job job) {
    Optional<KafkaFetcher<byte[], byte[]>> fetcher = Optional.empty();
    Optional<ProcessorImpl> processor = Optional.empty();
    Optional<DispatcherImpl> dispatcher = Optional.empty();
    Optional<GrpcDispatcher> grpcDispatcher = Optional.empty();
    Optional<PipelineStateManager> kafkaPipelineStateManager = Optional.empty();
    try {
      kafkaPipelineStateManager = Optional.of(new KafkaPipelineStateManager(job, infra.scope()));
      boolean isSecure = job.hasSecurityConfig() && job.getSecurityConfig().getIsSecure();
      fetcher =
          Optional.of(
              kafkaFetcherFactory.create(job, getThreadName(job, CONSUMER, serviceName), infra));
      String processorId = getThreadName(job, PROCESSOR, serviceName);
      processor = Optional.of(processorFactory.create(job, processorId));
      grpcDispatcher =
          Optional.of(
              grpcDispatcherFactory.create(
                  serviceName,
                  job.getRpcDispatcherTask().getUri(),
                  job.getRpcDispatcherTask().getProcedure()));
      final String clientId = getThreadName(job, PRODUCER, serviceName);
      Optional<KafkaDispatcher<byte[], byte[]>> resqKafkaProducer =
          RetryUtils.hasResqTopic(job)
              ? Optional.of(
                  kafkaDispatcherFactory.create(
                      clientId, job.getResqConfig().getResqCluster(), infra, isSecure, true))
              : Optional.empty();
      dispatcher =
          Optional.of(
              new DispatcherImpl(
                  infra,
                  grpcDispatcher.get(),
                  // DLQ always use lossless producer
                  kafkaDispatcherFactory.create(clientId, DLQ, infra, isSecure, false),
                  resqKafkaProducer));
      return new PipelineImpl(
          pipelineId,
          fetcher.get(),
          processor.get(),
          dispatcher.get(),
          kafkaPipelineStateManager.get());
    } catch (Exception e) {
      LOGGER.info("failed to create pipeline", e);
      kafkaPipelineStateManager.ifPresent(pipelineStateManager -> pipelineStateManager.cancel(job));
      fetcher.ifPresent(KafkaFetcher::stop);
      processor.ifPresent(ProcessorImpl::stop);
      grpcDispatcher.ifPresent(GrpcDispatcher::stop);
      dispatcher.ifPresent(DispatcherImpl::stop);
      throw new RuntimeException(e);
    }
  }