public void runTemplate()

in java/src/main/java/com/google/cloud/dataproc/templates/jdbc/JDBCToBigQuery.java [86:130]


  public void runTemplate() {

    SparkSession spark = null;

    spark =
        SparkSession.builder()
            .appName("Spark JDBCToBigQuery Job")
            .config("temporaryGcsBucket", temporaryGcsBucket)
            .enableHiveSupport()
            .getOrCreate();

    // Set log level
    spark.sparkContext().setLogLevel(sparkLogLevel);

    HashMap<String, String> jdbcProperties = new HashMap<>();
    jdbcProperties.put(JDBCOptions.JDBC_URL(), jdbcURL);
    jdbcProperties.put(JDBCOptions.JDBC_DRIVER_CLASS(), jdbcDriverClassName);
    jdbcProperties.put(JDBCOptions.JDBC_URL(), jdbcURL);
    jdbcProperties.put(JDBCOptions.JDBC_TABLE_NAME(), jdbcSQL);

    if (StringUtils.isNotBlank(concatedPartitionProps)) {
      jdbcProperties.put(JDBCOptions.JDBC_PARTITION_COLUMN(), jdbcSQLPartitionColumn);
      jdbcProperties.put(JDBCOptions.JDBC_UPPER_BOUND(), jdbcSQLUpperBound);
      jdbcProperties.put(JDBCOptions.JDBC_LOWER_BOUND(), jdbcSQLLowerBound);
      jdbcProperties.put(JDBCOptions.JDBC_NUM_PARTITIONS(), jdbcSQLNumPartitions);
    }

    if (StringUtils.isNotBlank(jdbcFetchSize)) {
      jdbcProperties.put(JDBCOptions.JDBC_BATCH_FETCH_SIZE(), jdbcFetchSize);
    }

    if (StringUtils.isNotBlank(jdbcSessionInitStatement)) {
      jdbcProperties.put(JDBCOptions.JDBC_SESSION_INIT_STATEMENT(), jdbcSessionInitStatement);
    }

    /** Read Input data from JDBC table */
    Dataset<Row> inputData = spark.read().format("jdbc").options(jdbcProperties).load();

    if (StringUtils.isNotBlank(tempTable) && StringUtils.isNotBlank(tempQuery)) {
      inputData.createOrReplaceGlobalTempView(tempTable);
      inputData = spark.sql(tempQuery);
    }

    inputData.write().mode(bqWriteMode).format("bigquery").option("table", bqLocation).save();
  }