public void runTemplate()

in java/src/main/java/com/google/cloud/dataproc/templates/databases/SpannerToGCS.java [51:100]


  public void runTemplate() {

    SparkSession spark = SparkSession.builder().appName("DatabaseToGCS Dataproc job").getOrCreate();

    // Set log level
    spark.sparkContext().setLogLevel(config.getSparkLogLevel());

    LOGGER.debug("added jars : {}", spark.sparkContext().addedJars().keys());

    LOGGER.info("Spanner JDBC Dialect is {}", config.getSpannerJdbcDialect());
    switch (config.getSpannerJdbcDialect().toLowerCase()) {
      case SPANNER_GOOGLESQL_JDBC_DIALECT:
        JdbcDialects.registerDialect(new SpannerJdbcDialect());
        break;

      case SPANNER_POSTGRESQL_JDBC_DIALECT:
        JdbcDialects.registerDialect(new SpannerPostgresJDBCDialect());
        break;
    }

    HashMap<String, String> jdbcProperties = new HashMap<>();
    jdbcProperties.put(JDBCOptions.JDBC_URL(), config.getSpannerJdbcUrl());
    jdbcProperties.put(JDBCOptions.JDBC_TABLE_NAME(), config.getInputTableId());
    jdbcProperties.put(JDBCOptions.JDBC_DRIVER_CLASS(), SPANNER_JDBC_DRIVER);

    if (StringUtils.isNotBlank(config.getConcatedPartitionProperties())) {
      jdbcProperties.put(JDBCOptions.JDBC_PARTITION_COLUMN(), config.getSqlPartitionColumn());
      jdbcProperties.put(JDBCOptions.JDBC_UPPER_BOUND(), config.getSqlUpperBound());
      jdbcProperties.put(JDBCOptions.JDBC_LOWER_BOUND(), config.getSqlLowerBound());
      jdbcProperties.put(JDBCOptions.JDBC_NUM_PARTITIONS(), config.getSqlNumPartitions());
    }

    Dataset<Row> jdbcDF = spark.read().format("jdbc").options(jdbcProperties).load();

    LOGGER.info("Data load complete from table/query: {}", config.getInputTableId());

    if (StringUtils.isNotBlank(config.getTempTable())
        && StringUtils.isNotBlank(config.getTempQuery())) {
      jdbcDF.createOrReplaceGlobalTempView(config.getTempTable());
      jdbcDF = spark.sql(config.getTempQuery());
    }

    DataFrameWriter<Row> writer =
        jdbcDF.write().format(config.getGcsOutputFormat()).mode(config.getGcsWriteMode());

    LOGGER.info("Start writing to GCS Bucket {}", config.getGcsOutputLocation());
    writer.save(config.getGcsOutputLocation());

    spark.stop();
  }