def run()

in python/dataproc_templates/s3/s3_to_bigquery.py [0:0]


    def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:

        logger: Logger = self.get_logger(spark=spark)

        # Arguments
        input_file_location: str = args[constants.S3_BQ_INPUT_LOCATION]
        access_key: str = args[constants.S3_BQ_ACCESS_KEY]
        secret_key: str = args[constants.S3_BQ_SECRET_KEY]
        input_file_format: str = args[constants.S3_BQ_INPUT_FORMAT]
        bq_dataset: str = args[constants.S3_BQ_OUTPUT_DATASET_NAME]
        bq_table: str = args[constants.S3_BQ_OUTPUT_TABLE_NAME]
        bq_temp_bucket: str = args[constants.S3_BQ_TEMP_BUCKET_NAME]
        output_mode: str = args[constants.S3_BQ_OUTPUT_MODE]

        ignore_keys = {constants.S3_BQ_ACCESS_KEY, constants.S3_BQ_SECRET_KEY}
        filtered_args = {key:val for key,val in args.items() if key not in ignore_keys}
        logger.info(
            "Starting Amazon S3 to Bigquery spark job with parameters:\n"
            f"{pprint.pformat(filtered_args)}"
        )

        # Set configuration to connect to Amazon S3
        spark._jsc.hadoopConfiguration() \
            .set(constants.AWS_S3ENDPOINT, constants.S3_BQ_ENDPOINT_VALUE)
        spark._jsc.hadoopConfiguration() \
            .set(constants.AWS_S3ACCESSKEY, access_key)
        spark._jsc.hadoopConfiguration() \
            .set(constants.AWS_S3SECRETKEY, secret_key)

        # Read
        input_data: DataFrame

        if input_file_format == constants.FORMAT_PRQT:
            input_data = spark.read \
                .parquet(input_file_location)
        elif input_file_format == constants.FORMAT_AVRO:
            input_data = spark.read \
                .format(constants.FORMAT_AVRO_EXTD) \
                .load(input_file_location)
        elif input_file_format == constants.FORMAT_CSV:
            input_data = spark.read \
                .format(constants.FORMAT_CSV) \
                .option(constants.CSV_HEADER, True) \
                .option(constants.CSV_INFER_SCHEMA, True) \
                .load(input_file_location)
        elif input_file_format == constants.FORMAT_JSON:
            input_data = spark.read \
                .json(input_file_location)

        # Write
        input_data.write \
            .format(constants.FORMAT_BIGQUERY) \
            .option(constants.TABLE, bq_dataset + "." + bq_table) \
            .option(constants.TEMP_GCS_BUCKET, bq_temp_bucket) \
            .mode(output_mode) \
            .save()