in java/src/main/java/com/google/cloud/dataproc/templates/databases/SpannerToGCS.java [51:100]
public void runTemplate() {
SparkSession spark = SparkSession.builder().appName("DatabaseToGCS Dataproc job").getOrCreate();
// Set log level
spark.sparkContext().setLogLevel(config.getSparkLogLevel());
LOGGER.debug("added jars : {}", spark.sparkContext().addedJars().keys());
LOGGER.info("Spanner JDBC Dialect is {}", config.getSpannerJdbcDialect());
switch (config.getSpannerJdbcDialect().toLowerCase()) {
case SPANNER_GOOGLESQL_JDBC_DIALECT:
JdbcDialects.registerDialect(new SpannerJdbcDialect());
break;
case SPANNER_POSTGRESQL_JDBC_DIALECT:
JdbcDialects.registerDialect(new SpannerPostgresJDBCDialect());
break;
}
HashMap<String, String> jdbcProperties = new HashMap<>();
jdbcProperties.put(JDBCOptions.JDBC_URL(), config.getSpannerJdbcUrl());
jdbcProperties.put(JDBCOptions.JDBC_TABLE_NAME(), config.getInputTableId());
jdbcProperties.put(JDBCOptions.JDBC_DRIVER_CLASS(), SPANNER_JDBC_DRIVER);
if (StringUtils.isNotBlank(config.getConcatedPartitionProperties())) {
jdbcProperties.put(JDBCOptions.JDBC_PARTITION_COLUMN(), config.getSqlPartitionColumn());
jdbcProperties.put(JDBCOptions.JDBC_UPPER_BOUND(), config.getSqlUpperBound());
jdbcProperties.put(JDBCOptions.JDBC_LOWER_BOUND(), config.getSqlLowerBound());
jdbcProperties.put(JDBCOptions.JDBC_NUM_PARTITIONS(), config.getSqlNumPartitions());
}
Dataset<Row> jdbcDF = spark.read().format("jdbc").options(jdbcProperties).load();
LOGGER.info("Data load complete from table/query: {}", config.getInputTableId());
if (StringUtils.isNotBlank(config.getTempTable())
&& StringUtils.isNotBlank(config.getTempQuery())) {
jdbcDF.createOrReplaceGlobalTempView(config.getTempTable());
jdbcDF = spark.sql(config.getTempQuery());
}
DataFrameWriter<Row> writer =
jdbcDF.write().format(config.getGcsOutputFormat()).mode(config.getGcsWriteMode());
LOGGER.info("Start writing to GCS Bucket {}", config.getGcsOutputLocation());
writer.save(config.getGcsOutputLocation());
spark.stop();
}