public void runTemplate()

in java/src/main/java/com/google/cloud/dataproc/templates/gcs/GCStoBigTable.java [66:120]


  public void runTemplate()
      throws StreamingQueryException, TimeoutException, SQLException, InterruptedException {

    LOGGER.info("Initialize the Spark session");
    SparkSession spark = SparkSession.builder().appName("Spark GCStoBigTable Job").getOrCreate();

    // Set log level
    spark.sparkContext().setLogLevel(gcStoBigTableConfig.getSparkLogLevel());

    LOGGER.info("Read Data");
    Dataset<Row> inputData;
    switch (gcStoBigTableConfig.getInputFileFormat()) {
      case GCS_BQ_CSV_FORMAT:
        inputData =
            spark
                .read()
                .format(GCS_BQ_CSV_FORMAT)
                .option(GCS_BQ_CSV_HEADER, true)
                .option(GCS_BQ_CSV_INFOR_SCHEMA, true)
                .load(gcStoBigTableConfig.getInputFileLocation());
        break;
      case GCS_BQ_AVRO_FORMAT:
        inputData =
            spark
                .read()
                .format(GCS_BQ_AVRO_EXTD_FORMAT)
                .load(gcStoBigTableConfig.getInputFileLocation());
        break;
      case GCS_BQ_PRQT_FORMAT:
        inputData = spark.read().parquet(gcStoBigTableConfig.getInputFileLocation());
        break;
      default:
        throw new IllegalArgumentException(
            "Currently avro, parquet and csv are the only supported formats");
    }

    LOGGER.info("Retrieve Catalog");
    String catalog =
        getBigTableCatalog(
            gcStoBigTableConfig.getProjectId(), gcStoBigTableConfig.getBigTableCatalogLocation());

    LOGGER.info("Input File Schema:  \n{}", inputData.schema().prettyJson());
    LOGGER.info("BigTable Catalog:  \n{}", catalog);

    LOGGER.info("Write To BigTable");
    inputData
        .write()
        .format(SPARK_BIGTABLE_FORMAT)
        .option(SPARK_BIGTABLE_CATALOG, catalog)
        .option(SPARK_BIGTABLE_PROJECT_ID, gcStoBigTableConfig.getBigTableProjectId())
        .option(SPARK_BIGTABLE_INSTANCE_ID, gcStoBigTableConfig.getBigTableInstanceId())
        .option(SPARK_BIGTABLE_CREATE_NEW_TABLE, gcStoBigTableConfig.getIsCreateBigTable())
        .option(SPARK_BIGTABLE_BATCH_MUTATE_SIZE, gcStoBigTableConfig.getBigTableBatchMutateSize())
        .save();
  }