public void validateInput()

in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToBQ.java [102:151]


  public void validateInput() {
    if (StringUtils.isAllBlank(checkpointLocation)
        || StringUtils.isAllBlank(kafkaBootstrapServers)
        || StringUtils.isAllBlank(kafkaTopic)
        || StringUtils.isAllBlank(bigQueryDataset)
        || StringUtils.isAllBlank(bigQueryTable)
        || StringUtils.isAllBlank(projectId)
        || StringUtils.isAllBlank(tempGcsBucket)) {
      LOGGER.error(
          "{},{},{},{},{},{},{} is required parameter. ",
          PROJECT_ID_PROP,
          KAFKA_BQ_CHECKPOINT_LOCATION,
          KAFKA_BQ_BOOTSTRAP_SERVERS,
          KAFKA_BQ_TOPIC,
          KAFKA_BQ_DATASET,
          KAFKA_BQ_TABLE,
          KAFKA_BQ_TEMP_GCS_BUCKET);
      throw new IllegalArgumentException(
          "Required parameters for KafkaToBQ not passed. "
              + "Set mandatory parameter for KafkaToBQ template "
              + "in resources/conf/template.properties file.");
    }

    LOGGER.info(
        "Starting Kafka to BQ spark job with following parameters:"
            + "1. {}:{} "
            + "2. {}:{} "
            + "3. {}:{} "
            + "4. {},{} "
            + "5. {},{} "
            + "6. {},{} "
            + "7. {},{} "
            + "8. {},{} ",
        KAFKA_BQ_CHECKPOINT_LOCATION,
        checkpointLocation,
        KAFKA_BQ_BOOTSTRAP_SERVERS,
        kafkaBootstrapServers,
        KAFKA_BQ_TOPIC,
        kafkaTopic,
        KAFKA_BQ_STARTING_OFFSET,
        kafkaStartingOffsets,
        KAFKA_BQ_AWAIT_TERMINATION_TIMEOUT,
        kafkaAwaitTerminationTimeout,
        KAFKA_BQ_DATASET,
        bigQueryDataset,
        KAFKA_BQ_TABLE,
        bigQueryTable,
        KAFKA_BQ_TEMP_GCS_BUCKET,
        tempGcsBucket);
  }