public void runTemplate()

in java/src/main/java/com/google/cloud/dataproc/templates/s3/S3ToBigQuery.java [59:112]


  public void runTemplate() {

    SparkSession spark = SparkSession.builder().appName("S3 to Bigquery load").getOrCreate();

    // Set log level
    spark.sparkContext().setLogLevel(sparkLogLevel);

    spark.sparkContext().hadoopConfiguration().set(S3_BQ_ACCESS_KEY_CONFIG_NAME, accessKey);
    spark.sparkContext().hadoopConfiguration().set(S3_BQ_SECRET_KEY_CONFIG_NAME, accessSecret);
    spark
        .sparkContext()
        .hadoopConfiguration()
        .set(S3_BQ_ENDPOINT_CONFIG_NAME, S3_BQ_ENDPOINT_CONFIG_VALUE);

    Dataset<Row> inputData = null;

    switch (inputFileFormat) {
      case GCS_BQ_CSV_FORMAT:
        inputData =
            spark
                .read()
                .format(GCS_BQ_CSV_FORMAT)
                .option(S3_BQ_HEADER, true)
                .option(S3_BQ_INFER_SCHEMA, true)
                .load(inputFileLocation);
        break;
      case S3_BQ_AVRO_FORMAT:
        inputData = spark.read().format(GCS_BQ_AVRO_EXTD_FORMAT).load(inputFileLocation);
        break;
      case S3_BQ_PRQT_FORMAT:
        inputData = spark.read().parquet(inputFileLocation);
        break;
      case S3_BQ_JSON_FORMAT:
        inputData =
            spark
                .read()
                .format(S3_BQ_JSON_FORMAT)
                .option(S3_BQ_INFER_SCHEMA, true)
                .load(inputFileLocation);
        break;
      default:
        throw new IllegalArgumentException(
            "Currenlty avro, parquet, json and csv are the only supported formats");
    }

    inputData
        .write()
        .format(S3_BQ_OUTPUT_FORMAT)
        .option(S3_BQ_HEADER, true)
        .option(S3_BQ_OUTPUT, bigQueryDataset + "." + bigQueryTable)
        .option(S3_BQ_TEMP_BUCKET, bqTempBucket)
        .mode(SaveMode.valueOf(outputMode))
        .save();
  }