public void validateInput()

in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java [147:207]


  public void validateInput() throws IllegalArgumentException {
    if (StringUtils.isAllBlank(gcsOutputLocation)
        || StringUtils.isAllBlank(gcsOutputFormat)
        || StringUtils.isAllBlank(gcsWriteMode)
        || StringUtils.isAllBlank(kafkaBootstrapServers)
        || StringUtils.isAllBlank(kafkaTopic)
        || StringUtils.isAllBlank(kafkaMessageFormat)
        || StringUtils.isAllBlank(kafkaGroupId)) {
      LOGGER.error(
          "{},{},{},{},{},{},{} are required parameter. ",
          KAFKA_GCS_OUTPUT_LOCATION,
          KAFKA_GCS_OUTPUT_FORMAT,
          KAFKA_GCS_WRITE_MODE,
          KAFKA_BOOTSTRAP_SERVERS,
          KAFKA_TOPIC,
          KAFKA_MESSAGE_FORMAT,
          KAFKA_GCS_CONSUMER_GROUP_ID);
      throw new IllegalArgumentException(
          "Required parameters for KafkaTOGCSDstream not passed. "
              + "Set mandatory parameter for KafkaTOGCSDstream template "
              + "in resources/conf/template.properties file.");
    }

    if (kafkaMessageFormat.equals("json") & StringUtils.isAllBlank(kafkaSchemaUrl)) {
      LOGGER.error("{} is a required parameter for JSON format messages", KAFKA_SCHEMA_URL);
      throw new IllegalArgumentException("Required parameters for KafkaToGCSDstream not passed.");
    }

    LOGGER.info(
        "Starting kafka to GCS via DStream spark job with following parameters:"
            + "1. {}:{} "
            + "2. {}:{} "
            + "3. {}:{} "
            + "4. {}:{} "
            + "5. {}:{} "
            + "6. {}:{} "
            + "7. {}:{} "
            + "8. {}:{} "
            + "9. {}:{} "
            + "10. {}:{} ",
        KAFKA_MESSAGE_FORMAT,
        kafkaMessageFormat,
        KAFKA_GCS_OUTPUT_LOCATION,
        gcsOutputLocation,
        KAFKA_GCS_OUTPUT_FORMAT,
        gcsOutputFormat,
        KAFKA_GCS_WRITE_MODE,
        gcsWriteMode,
        KAFKA_BOOTSTRAP_SERVERS,
        kafkaBootstrapServers,
        KAFKA_TOPIC,
        kafkaTopic,
        KAFKA_GCS_BATCH_INTERVAL,
        batchInterval,
        KAFKA_SCHEMA_URL,
        kafkaSchemaUrl,
        KAFKA_GCS_CONSUMER_GROUP_ID,
        kafkaGroupId,
        KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT,
        kafkaAwaitTerminationTimeout);
  }