in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToGCS.java [100:146]
public void validateInput() {
if (StringUtils.isAllBlank(gcsOutputLocation)
|| StringUtils.isAllBlank(kafkaBootstrapServers)
|| StringUtils.isAllBlank(kafkaTopic)
|| StringUtils.isAllBlank(kafkaMessageFormat)) {
LOGGER.error(
"{},{},{},{} is required parameter. ",
KAFKA_GCS_OUTPUT_LOCATION,
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_TOPIC,
KAFKA_MESSAGE_FORMAT);
throw new IllegalArgumentException(
"Required parameters for KafkaToGCS not passed. "
+ "Set mandatory parameter for KafkaToGCS template "
+ "in resources/conf/template.properties file.");
}
if (kafkaMessageFormat.equals("json") & StringUtils.isAllBlank(kafkaSchemaUrl)) {
LOGGER.error("{} is a required parameter for JSON format messages", KAFKA_SCHEMA_URL);
throw new IllegalArgumentException("Required parameters for KafkaToGCS not passed.");
}
SparkSession spark = null;
LOGGER.info(
"Starting Kafka to GCS spark job with following parameters:"
+ "1. {}:{}"
+ "2. {}:{}"
+ "3. {}:{}"
+ "4. {},{}"
+ "5. {},{}"
+ "6. {},{}"
+ "7, {},{}",
KAFKA_GCS_OUTPUT_LOCATION,
gcsOutputLocation,
KAFKA_GCS_OUTPUT_FORMAT,
gcsOutputFormat,
KAFKA_BOOTSTRAP_SERVERS,
kafkaBootstrapServers,
KAFKA_TOPIC,
kafkaTopic,
KAFKA_STARTING_OFFSET,
kafkaStartingOffsets,
KAFKA_GCS_OUTPUT_MODE,
kafkaOutputMode,
KAFKA_GCS_AWAIT_TERMINATION_TIMEOUT,
kafkaAwaitTerminationTimeout);
}