in java/src/main/java/com/google/cloud/dataproc/templates/s3/S3ToBigQuery.java [59:112]
public void runTemplate() {
SparkSession spark = SparkSession.builder().appName("S3 to Bigquery load").getOrCreate();
// Set log level
spark.sparkContext().setLogLevel(sparkLogLevel);
spark.sparkContext().hadoopConfiguration().set(S3_BQ_ACCESS_KEY_CONFIG_NAME, accessKey);
spark.sparkContext().hadoopConfiguration().set(S3_BQ_SECRET_KEY_CONFIG_NAME, accessSecret);
spark
.sparkContext()
.hadoopConfiguration()
.set(S3_BQ_ENDPOINT_CONFIG_NAME, S3_BQ_ENDPOINT_CONFIG_VALUE);
Dataset<Row> inputData = null;
switch (inputFileFormat) {
case GCS_BQ_CSV_FORMAT:
inputData =
spark
.read()
.format(GCS_BQ_CSV_FORMAT)
.option(S3_BQ_HEADER, true)
.option(S3_BQ_INFER_SCHEMA, true)
.load(inputFileLocation);
break;
case S3_BQ_AVRO_FORMAT:
inputData = spark.read().format(GCS_BQ_AVRO_EXTD_FORMAT).load(inputFileLocation);
break;
case S3_BQ_PRQT_FORMAT:
inputData = spark.read().parquet(inputFileLocation);
break;
case S3_BQ_JSON_FORMAT:
inputData =
spark
.read()
.format(S3_BQ_JSON_FORMAT)
.option(S3_BQ_INFER_SCHEMA, true)
.load(inputFileLocation);
break;
default:
throw new IllegalArgumentException(
"Currenlty avro, parquet, json and csv are the only supported formats");
}
inputData
.write()
.format(S3_BQ_OUTPUT_FORMAT)
.option(S3_BQ_HEADER, true)
.option(S3_BQ_OUTPUT, bigQueryDataset + "." + bigQueryTable)
.option(S3_BQ_TEMP_BUCKET, bqTempBucket)
.mode(SaveMode.valueOf(outputMode))
.save();
}