in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToBQDstream.java [152:210]
public void validateInput() {
if (StringUtils.isAllBlank(kafkaBootstrapServers)
|| StringUtils.isAllBlank(kafkaTopic)
|| StringUtils.isAllBlank(bigQueryDataset)
|| StringUtils.isAllBlank(bigQueryTable)
|| StringUtils.isAllBlank(projectId)
|| StringUtils.isAllBlank(tempGcsBucket)
|| StringUtils.isAllBlank(kafkaGroupId)
|| StringUtils.isAllBlank(bqWriteMode)) {
LOGGER.error(
"{},{},{},{},{},{},{},{} is required parameter. ",
PROJECT_ID_PROP,
KAFKA_BOOTSTRAP_SERVERS,
KAFKA_TOPIC,
KAFKA_BQ_DATASET,
KAFKA_BQ_TABLE,
KAFKA_BQ_TEMP_GCS_BUCKET,
KAFKA_BQ_CONSUMER_GROUP_ID,
KAFKA_BQ_STREAM_OUTPUT_MODE);
throw new IllegalArgumentException(
"Required parameters for KafkaToBQDstream not passed. "
+ "Set mandatory parameter for KafkaToBQDstream template "
+ "in resources/conf/template.properties file.");
}
LOGGER.info(
"Starting Kafka to BQ via DStream spark job with following parameters:"
+ "1. {}:{} "
+ "2. {}:{} "
+ "3. {}:{} "
+ "4. {},{} "
+ "5. {},{} "
+ "6. {},{} "
+ "7. {},{} "
+ "8. {},{} "
+ "9. {},{} "
+ "10. {},{} ",
KAFKA_BOOTSTRAP_SERVERS,
kafkaBootstrapServers,
KAFKA_TOPIC,
kafkaTopic,
KAFKA_STARTING_OFFSET,
kafkaStartingOffsets,
KAFKA_BQ_AWAIT_TERMINATION_TIMEOUT,
kafkaAwaitTerminationTimeout,
KAFKA_BQ_DATASET,
bigQueryDataset,
KAFKA_BQ_TABLE,
bigQueryTable,
KAFKA_BQ_TEMP_GCS_BUCKET,
tempGcsBucket,
KAFKA_BQ_BATCH_INTERVAL,
batchInterval,
KAFKA_BQ_CONSUMER_GROUP_ID,
kafkaGroupId,
KAFKA_BQ_STREAM_OUTPUT_MODE,
bqWriteMode);
}