public void validateInput()

in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCS.java [100:146]


  public void validateInput() {
    if (StringUtils.isAllBlank(gcsOutputLocation)
        || StringUtils.isAllBlank(kafkaBootstrapServers)
        || StringUtils.isAllBlank(kafkaTopic)
        || StringUtils.isAllBlank(kafkaMessageFormat)) {
      LOGGER.error(
          "{},{},{},{} is required parameter. ",
          KAFKA_GCS_OUTPUT_LOCATION,
          KAFKA_BOOTSTRAP_SERVERS,
          KAFKA_TOPIC,
          KAFKA_MESSAGE_FORMAT);
      throw new IllegalArgumentException(
          "Required parameters for KafkaToGCS not passed. "
              + "Set mandatory parameter for KafkaToGCS template "
              + "in resources/conf/template.properties file.");
    }

    if (kafkaMessageFormat.equals("json") & StringUtils.isAllBlank(kafkaSchemaUrl)) {
      LOGGER.error("{} is a required parameter for JSON format messages", KAFKA_SCHEMA_URL);
      throw new IllegalArgumentException("Required parameters for KafkaToGCS not passed.");
    }

    SparkSession spark = null;
    LOGGER.info(
        "Starting Kafka to GCS spark job with following parameters:"
            + "1. {}:{}"
            + "2. {}:{}"
            + "3. {}:{}"
            + "4. {},{}"
            + "5. {},{}"
            + "6. {},{}"
            + "7, {},{}",
        KAFKA_GCS_OUTPUT_LOCATION,
        gcsOutputLocation,
        KAFKA_GCS_OUTPUT_FORMAT,
        gcsOutputFormat,
        KAFKA_BOOTSTRAP_SERVERS,
        kafkaBootstrapServers,
        KAFKA_TOPIC,
        kafkaTopic,
        KAFKA_STARTING_OFFSET,
        kafkaStartingOffsets,
        KAFKA_GCS_OUTPUT_MODE,
        kafkaOutputMode,
        KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT,
        kafkaAwaitTerminationTimeout);
  }