public void runTemplate()

in java/src/main/java/com/google/cloud/dataproc/templates/dataplex/DataplexGCStoBQ.java [459:524]


  public void runTemplate() {

    try {
      this.spark = SparkSession.builder().appName("Dataplex GCS to BQ").getOrCreate();

      // Set log level
      this.spark.sparkContext().setLogLevel(sparkLogLevel);

      this.sqlContext = new SQLContext(spark);

      this.sourceEntityUtil = new DataplexEntityUtil(this.sourceEntity);
      this.sourceEntityBasePath = sourceEntityUtil.getBasePathEntityData();
      this.inputFileFormat = sourceEntityUtil.getInputFileFormat();

      // checking for CSV delimiter when applicable
      if (this.inputFileFormat.equals(GCS_BQ_CSV_FORMAT)) {
        this.inputCSVDelimiter = sourceEntityUtil.getInputCSVDelimiter();
      }

      // checking for user provided target table name
      checkTarget();

      // listing source data partitions
      List<String> partitionKeysList = getPartitionKeyList();

      // if source data has no partitions a full load and overwrite is performed
      if (partitionKeysList == null) {
        newDataDS = getDataFrameReader().load(this.sourceEntityBasePath);
        this.sparkSaveMode = SPARK_SAVE_MODE_OVERWRITE;
      } else {
        List<String> partitionsListWithLocationAndKeys =
            sourceEntityUtil.getPartitionsListWithLocationAndKeys();

        // Building dataset with all partitions keys in Dataplex Entity
        Dataset<Row> dataplexPartitionsKeysDS =
            getAllPartitionsDf(partitionsListWithLocationAndKeys, partitionKeysList);

        // Querying BQ for all partition keys currently present in target table
        Dataset<Row> bqPartitionsKeysDS = getBQTargetAvailablePartitionsDf(partitionKeysList);

        // Compare dataplexPartitionsKeysDS and bqPartitionsKeysDS to identify new
        // partitions
        Dataset<Row> newPartitionsPathsDS =
            getNewPartitionsPathsDS(
                partitionKeysList, dataplexPartitionsKeysDS, bqPartitionsKeysDS);

        // load data from each partition
        newDataDS = getNewPartitionsDS(newPartitionsPathsDS);
      }

      if (newDataDS != null) {
        newDataDS = this.sourceEntityUtil.castDatasetToDataplexSchema(newDataDS);
        newDataDS = applyCustomSql(newDataDS);
        writeToBQ(newDataDS);
      } else {
        LOGGER.info("No new partitions found");
      }

    } catch (Throwable th) {
      LOGGER.error("Exception in DataplexGCStoBQ", th);
      if (Objects.nonNull(spark)) {
        spark.stop();
      }
      throw new DataprocTemplateException(th.getMessage());
    }
  }