in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToBQ.java [102:151]
public void validateInput() {
if (StringUtils.isAllBlank(checkpointLocation)
|| StringUtils.isAllBlank(kafkaBootstrapServers)
|| StringUtils.isAllBlank(kafkaTopic)
|| StringUtils.isAllBlank(bigQueryDataset)
|| StringUtils.isAllBlank(bigQueryTable)
|| StringUtils.isAllBlank(projectId)
|| StringUtils.isAllBlank(tempGcsBucket)) {
LOGGER.error(
"{},{},{},{},{},{},{} is required parameter. ",
PROJECT_ID_PROP,
KAFKA_BQ_CHECKPOINT_LOCATION,
KAFKA_BQ_BOOTSTRAP_SERVERS,
KAFKA_BQ_TOPIC,
KAFKA_BQ_DATASET,
KAFKA_BQ_TABLE,
KAFKA_BQ_TEMP_GCS_BUCKET);
throw new IllegalArgumentException(
"Required parameters for KafkaToBQ not passed. "
+ "Set mandatory parameter for KafkaToBQ template "
+ "in resources/conf/template.properties file.");
}
LOGGER.info(
"Starting Kafka to BQ spark job with following parameters:"
+ "1. {}:{} "
+ "2. {}:{} "
+ "3. {}:{} "
+ "4. {},{} "
+ "5. {},{} "
+ "6. {},{} "
+ "7. {},{} "
+ "8. {},{} ",
KAFKA_BQ_CHECKPOINT_LOCATION,
checkpointLocation,
KAFKA_BQ_BOOTSTRAP_SERVERS,
kafkaBootstrapServers,
KAFKA_BQ_TOPIC,
kafkaTopic,
KAFKA_BQ_STARTING_OFFSET,
kafkaStartingOffsets,
KAFKA_BQ_AWAIT_TERMINATION_TIMEOUT,
kafkaAwaitTerminationTimeout,
KAFKA_BQ_DATASET,
bigQueryDataset,
KAFKA_BQ_TABLE,
bigQueryTable,
KAFKA_BQ_TEMP_GCS_BUCKET,
tempGcsBucket);
}