public void validateInput()

in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToPubSub.java [87:129]


  public void validateInput() {
    if (StringUtils.isAllBlank(pubsubCheckpointLocation)
        || StringUtils.isAllBlank(kafkaBootstrapServers)
        || StringUtils.isAllBlank(kafkaTopic)
        || StringUtils.isAllBlank(pubsubProject)
        || StringUtils.isAllBlank(pubsubTopic)) {
      LOGGER.error(
          "{},{},{},{},{} are required parameters. ",
          KAFKA_PUBSUB_BOOTSTRAP_SERVERS,
          KAFKA_PUBSUB_INPUT_TOPIC,
          KAFKA_PUBSUB_OUTPUT_TOPIC,
          KAFKA_PUBSUB_OUTPUT_PROJECT_ID,
          KAFKA_PUBSUB_CHECKPOINT_LOCATION);
      throw new IllegalArgumentException(
          "Required parameters for KafkaToPubSub not passed. "
              + "Set mandatory parameter for KafkaToPubSub template "
              + "in resources/conf/template.properties file.");
    }

    LOGGER.info(
        "Starting Kafka to PubSub spark job with following parameters:\n"
            + "1. {}:{}\n"
            + "2. {}:{}\n"
            + "3. {}:{}\n"
            + "4. {}:{}\n"
            + "5. {}:{}\n"
            + "6. {}:{}\n"
            + "7. {}:{}\n",
        KAFKA_PUBSUB_BOOTSTRAP_SERVERS,
        kafkaBootstrapServers,
        KAFKA_PUBSUB_INPUT_TOPIC,
        kafkaTopic,
        KAFKA_PUBSUB_OUTPUT_TOPIC,
        pubsubTopic,
        KAFKA_PUBSUB_OUTPUT_PROJECT_ID,
        pubsubProject,
        KAFKA_PUBSUB_CHECKPOINT_LOCATION,
        pubsubCheckpointLocation,
        KAFKA_PUBSUB_STARTING_OFFSET,
        kafkaStartingOffsets,
        KAFKA_PUBSUB_AWAIT_TERMINATION_TIMEOUT,
        kafkaAwaitTerminationTimeout);
  }