in java/src/main/java/com/google/cloud/dataproc/templates/jdbc/JDBCToBigQuery.java [86:130]
public void runTemplate() {
SparkSession spark = null;
spark =
SparkSession.builder()
.appName("Spark JDBCToBigQuery Job")
.config("temporaryGcsBucket", temporaryGcsBucket)
.enableHiveSupport()
.getOrCreate();
// Set log level
spark.sparkContext().setLogLevel(sparkLogLevel);
HashMap<String, String> jdbcProperties = new HashMap<>();
jdbcProperties.put(JDBCOptions.JDBC_URL(), jdbcURL);
jdbcProperties.put(JDBCOptions.JDBC_DRIVER_CLASS(), jdbcDriverClassName);
jdbcProperties.put(JDBCOptions.JDBC_URL(), jdbcURL);
jdbcProperties.put(JDBCOptions.JDBC_TABLE_NAME(), jdbcSQL);
if (StringUtils.isNotBlank(concatedPartitionProps)) {
jdbcProperties.put(JDBCOptions.JDBC_PARTITION_COLUMN(), jdbcSQLPartitionColumn);
jdbcProperties.put(JDBCOptions.JDBC_UPPER_BOUND(), jdbcSQLUpperBound);
jdbcProperties.put(JDBCOptions.JDBC_LOWER_BOUND(), jdbcSQLLowerBound);
jdbcProperties.put(JDBCOptions.JDBC_NUM_PARTITIONS(), jdbcSQLNumPartitions);
}
if (StringUtils.isNotBlank(jdbcFetchSize)) {
jdbcProperties.put(JDBCOptions.JDBC_BATCH_FETCH_SIZE(), jdbcFetchSize);
}
if (StringUtils.isNotBlank(jdbcSessionInitStatement)) {
jdbcProperties.put(JDBCOptions.JDBC_SESSION_INIT_STATEMENT(), jdbcSessionInitStatement);
}
/** Read Input data from JDBC table */
Dataset<Row> inputData = spark.read().format("jdbc").options(jdbcProperties).load();
if (StringUtils.isNotBlank(tempTable) && StringUtils.isNotBlank(tempQuery)) {
inputData.createOrReplaceGlobalTempView(tempTable);
inputData = spark.sql(tempQuery);
}
inputData.write().mode(bqWriteMode).format("bigquery").option("table", bqLocation).save();
}