public StoredJob newJob()

in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/controller/creator/BatchJobCreator.java [54:115]


  public StoredJob newJob(StoredJobGroup storedJobGroup, long jobId, int partition) {
    final KafkaConsumerTaskGroup consumerTaskGroup =
        storedJobGroup.getJobGroup().getKafkaConsumerTaskGroup();
    final String cluster = consumerTaskGroup.getCluster();
    final String topic = consumerTaskGroup.getTopic();
    final String consumerGroup = consumerTaskGroup.getConsumerGroup();
    final TopicPartition topicPartition = new TopicPartition(topic, partition);
    return Instrumentation.instrument.withRuntimeException(
        logger,
        infra.scope(),
        infra.tracer(),
        () -> {
          AdminClient adminClient = adminBuilder.build(cluster);
          OffsetRange partitionWatermarks =
              getPartitionLowAndHighWatermarks(topicPartition, consumerGroup, adminClient);

          // there are no messages in this partition
          if (partitionWatermarks.start() == partitionWatermarks.end()) {
            return newJob(
                scope, logger, storedJobGroup, JOB_TYPE, jobId, partition, partitionWatermarks);
          }

          // Use partition offsets if available
          if (!consumerTaskGroup
              .getPartitionOffsetRanges()
              .getPartitionOffsetRangeList()
              .isEmpty()) {
            logger.info(
                "Using partition offsets for job creation",
                StructuredLogging.kafkaTopic(topic),
                StructuredLogging.kafkaGroup(consumerGroup),
                StructuredLogging.kafkaPartition(partition));
            return newJob(
                scope,
                logger,
                storedJobGroup,
                JOB_TYPE,
                jobId,
                partition,
                getStartEndOffsetsFromPartitionOffsets(
                    consumerTaskGroup.getPartitionOffsetRanges(), partition));
          }

          // Use the timestamps
          return newJob(
              scope,
              logger,
              storedJobGroup,
              JOB_TYPE,
              jobId,
              partition,
              getStartEndOffsetsFromTimestamp(
                  consumerTaskGroup, partition, partitionWatermarks.end(), adminClient));
        },
        "creator.job.create",
        StructuredFields.KAFKA_CLUSTER,
        cluster,
        StructuredFields.KAFKA_TOPIC,
        topic,
        StructuredFields.KAFKA_PARTITION,
        Integer.toString(partition));
  }