in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToPubSub.java [87:129]
public void validateInput() {
if (StringUtils.isAllBlank(pubsubCheckpointLocation)
|| StringUtils.isAllBlank(kafkaBootstrapServers)
|| StringUtils.isAllBlank(kafkaTopic)
|| StringUtils.isAllBlank(pubsubProject)
|| StringUtils.isAllBlank(pubsubTopic)) {
LOGGER.error(
"{},{},{},{},{} are required parameters. ",
KAFKA_PUBSUB_BOOTSTRAP_SERVERS,
KAFKA_PUBSUB_INPUT_TOPIC,
KAFKA_PUBSUB_OUTPUT_TOPIC,
KAFKA_PUBSUB_OUTPUT_PROJECT_ID,
KAFKA_PUBSUB_CHECKPOINT_LOCATION);
throw new IllegalArgumentException(
"Required parameters for KafkaToPubSub not passed. "
+ "Set mandatory parameter for KafkaToPubSub template "
+ "in resources/conf/template.properties file.");
}
LOGGER.info(
"Starting Kafka to PubSub spark job with following parameters:\n"
+ "1. {}:{}\n"
+ "2. {}:{}\n"
+ "3. {}:{}\n"
+ "4. {}:{}\n"
+ "5. {}:{}\n"
+ "6. {}:{}\n"
+ "7. {}:{}\n",
KAFKA_PUBSUB_BOOTSTRAP_SERVERS,
kafkaBootstrapServers,
KAFKA_PUBSUB_INPUT_TOPIC,
kafkaTopic,
KAFKA_PUBSUB_OUTPUT_TOPIC,
pubsubTopic,
KAFKA_PUBSUB_OUTPUT_PROJECT_ID,
pubsubProject,
KAFKA_PUBSUB_CHECKPOINT_LOCATION,
pubsubCheckpointLocation,
KAFKA_PUBSUB_STARTING_OFFSET,
kafkaStartingOffsets,
KAFKA_PUBSUB_AWAIT_TERMINATION_TIMEOUT,
kafkaAwaitTerminationTimeout);
}