in java/src/main/java/com/google/cloud/dataproc/templates/dataplex/DataplexGCStoBQ.java [459:524]
public void runTemplate() {
try {
this.spark = SparkSession.builder().appName("Dataplex GCS to BQ").getOrCreate();
// Set log level
this.spark.sparkContext().setLogLevel(sparkLogLevel);
this.sqlContext = new SQLContext(spark);
this.sourceEntityUtil = new DataplexEntityUtil(this.sourceEntity);
this.sourceEntityBasePath = sourceEntityUtil.getBasePathEntityData();
this.inputFileFormat = sourceEntityUtil.getInputFileFormat();
// checking for CSV delimiter when applicable
if (this.inputFileFormat.equals(GCS_BQ_CSV_FORMAT)) {
this.inputCSVDelimiter = sourceEntityUtil.getInputCSVDelimiter();
}
// checking for user provided target table name
checkTarget();
// listing source data partitions
List<String> partitionKeysList = getPartitionKeyList();
// if source data has no partitions a full load and overwrite is performed
if (partitionKeysList == null) {
newDataDS = getDataFrameReader().load(this.sourceEntityBasePath);
this.sparkSaveMode = SPARK_SAVE_MODE_OVERWRITE;
} else {
List<String> partitionsListWithLocationAndKeys =
sourceEntityUtil.getPartitionsListWithLocationAndKeys();
// Building dataset with all partitions keys in Dataplex Entity
Dataset<Row> dataplexPartitionsKeysDS =
getAllPartitionsDf(partitionsListWithLocationAndKeys, partitionKeysList);
// Querying BQ for all partition keys currently present in target table
Dataset<Row> bqPartitionsKeysDS = getBQTargetAvailablePartitionsDf(partitionKeysList);
// Compare dataplexPartitionsKeysDS and bqPartitionsKeysDS to identify new
// partitions
Dataset<Row> newPartitionsPathsDS =
getNewPartitionsPathsDS(
partitionKeysList, dataplexPartitionsKeysDS, bqPartitionsKeysDS);
// load data from each partition
newDataDS = getNewPartitionsDS(newPartitionsPathsDS);
}
if (newDataDS != null) {
newDataDS = this.sourceEntityUtil.castDatasetToDataplexSchema(newDataDS);
newDataDS = applyCustomSql(newDataDS);
writeToBQ(newDataDS);
} else {
LOGGER.info("No new partitions found");
}
} catch (Throwable th) {
LOGGER.error("Exception in DataplexGCStoBQ", th);
if (Objects.nonNull(spark)) {
spark.stop();
}
throw new DataprocTemplateException(th.getMessage());
}
}