in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCSDstream.java [147:207]
public void validateInput() throws IllegalArgumentException {
if (StringUtils.isAllBlank(gcsOutputLocation)
|| StringUtils.isAllBlank(gcsOutputFormat)
|| StringUtils.isAllBlank(gcsWriteMode)
|| StringUtils.isAllBlank(kafkaBootstrapServers)
|| StringUtils.isAllBlank(kafkaTopic)
|| StringUtils.isAllBlank(kafkaMessageFormat)
|| StringUtils.isAllBlank(kafkaGroupId)) {
LOGGER.error(
"{},{},{},{},{},{},{} are required parameter. ",
KAFKA_GCS_OUTPUT_LOCATION,
KAFKA_GCS_OUTPUT_FORMAT,
KAFKA_GCS_WRITE_MODE,
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_TOPIC,
KAFKA_MESSAGE_FORMAT,
KAFKA_GCS_CONSUMER_GROUP_ID);
throw new IllegalArgumentException(
"Required parameters for KafkaTOGCSDstream not passed. "
+ "Set mandatory parameter for KafkaTOGCSDstream template "
+ "in resources/conf/template.properties file.");
}
if (kafkaMessageFormat.equals("json") & StringUtils.isAllBlank(kafkaSchemaUrl)) {
LOGGER.error("{} is a required parameter for JSON format messages", KAFKA_SCHEMA_URL);
throw new IllegalArgumentException("Required parameters for KafkaToGCSDstream not passed.");
}
LOGGER.info(
"Starting kafka to GCS via DStream spark job with following parameters:"
+ "1. {}:{} "
+ "2. {}:{} "
+ "3. {}:{} "
+ "4. {}:{} "
+ "5. {}:{} "
+ "6. {}:{} "
+ "7. {}:{} "
+ "8. {}:{} "
+ "9. {}:{} "
+ "10. {}:{} ",
KAFKA_MESSAGE_FORMAT,
kafkaMessageFormat,
KAFKA_GCS_OUTPUT_LOCATION,
gcsOutputLocation,
KAFKA_GCS_OUTPUT_FORMAT,
gcsOutputFormat,
KAFKA_GCS_WRITE_MODE,
gcsWriteMode,
KAFKA_BOOTSTRAP_SERVERS,
kafkaBootstrapServers,
KAFKA_TOPIC,
kafkaTopic,
KAFKA_GCS_BATCH_INTERVAL,
batchInterval,
KAFKA_SCHEMA_URL,
kafkaSchemaUrl,
KAFKA_GCS_CONSUMER_GROUP_ID,
kafkaGroupId,
KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT,
kafkaAwaitTerminationTimeout);
}