in java/src/main/java/com/google/cloud/dataproc/templates/gcs/GCStoBigTable.java [66:120]
public void runTemplate()
throws StreamingQueryException, TimeoutException, SQLException, InterruptedException {
LOGGER.info("Initialize the Spark session");
SparkSession spark = SparkSession.builder().appName("Spark GCStoBigTable Job").getOrCreate();
// Set log level
spark.sparkContext().setLogLevel(gcStoBigTableConfig.getSparkLogLevel());
LOGGER.info("Read Data");
Dataset<Row> inputData;
switch (gcStoBigTableConfig.getInputFileFormat()) {
case GCS_BQ_CSV_FORMAT:
inputData =
spark
.read()
.format(GCS_BQ_CSV_FORMAT)
.option(GCS_BQ_CSV_HEADER, true)
.option(GCS_BQ_CSV_INFOR_SCHEMA, true)
.load(gcStoBigTableConfig.getInputFileLocation());
break;
case GCS_BQ_AVRO_FORMAT:
inputData =
spark
.read()
.format(GCS_BQ_AVRO_EXTD_FORMAT)
.load(gcStoBigTableConfig.getInputFileLocation());
break;
case GCS_BQ_PRQT_FORMAT:
inputData = spark.read().parquet(gcStoBigTableConfig.getInputFileLocation());
break;
default:
throw new IllegalArgumentException(
"Currently avro, parquet and csv are the only supported formats");
}
LOGGER.info("Retrieve Catalog");
String catalog =
getBigTableCatalog(
gcStoBigTableConfig.getProjectId(), gcStoBigTableConfig.getBigTableCatalogLocation());
LOGGER.info("Input File Schema: \n{}", inputData.schema().prettyJson());
LOGGER.info("BigTable Catalog: \n{}", catalog);
LOGGER.info("Write To BigTable");
inputData
.write()
.format(SPARK_BIGTABLE_FORMAT)
.option(SPARK_BIGTABLE_CATALOG, catalog)
.option(SPARK_BIGTABLE_PROJECT_ID, gcStoBigTableConfig.getBigTableProjectId())
.option(SPARK_BIGTABLE_INSTANCE_ID, gcStoBigTableConfig.getBigTableInstanceId())
.option(SPARK_BIGTABLE_CREATE_NEW_TABLE, gcStoBigTableConfig.getIsCreateBigTable())
.option(SPARK_BIGTABLE_BATCH_MUTATE_SIZE, gcStoBigTableConfig.getBigTableBatchMutateSize())
.save();
}