in python/dataproc_templates/s3/s3_to_bigquery.py [0:0]
def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
logger: Logger = self.get_logger(spark=spark)
# Arguments
input_file_location: str = args[constants.S3_BQ_INPUT_LOCATION]
access_key: str = args[constants.S3_BQ_ACCESS_KEY]
secret_key: str = args[constants.S3_BQ_SECRET_KEY]
input_file_format: str = args[constants.S3_BQ_INPUT_FORMAT]
bq_dataset: str = args[constants.S3_BQ_OUTPUT_DATASET_NAME]
bq_table: str = args[constants.S3_BQ_OUTPUT_TABLE_NAME]
bq_temp_bucket: str = args[constants.S3_BQ_TEMP_BUCKET_NAME]
output_mode: str = args[constants.S3_BQ_OUTPUT_MODE]
ignore_keys = {constants.S3_BQ_ACCESS_KEY, constants.S3_BQ_SECRET_KEY}
filtered_args = {key:val for key,val in args.items() if key not in ignore_keys}
logger.info(
"Starting Amazon S3 to Bigquery spark job with parameters:\n"
f"{pprint.pformat(filtered_args)}"
)
# Set configuration to connect to Amazon S3
spark._jsc.hadoopConfiguration() \
.set(constants.AWS_S3ENDPOINT, constants.S3_BQ_ENDPOINT_VALUE)
spark._jsc.hadoopConfiguration() \
.set(constants.AWS_S3ACCESSKEY, access_key)
spark._jsc.hadoopConfiguration() \
.set(constants.AWS_S3SECRETKEY, secret_key)
# Read
input_data: DataFrame
if input_file_format == constants.FORMAT_PRQT:
input_data = spark.read \
.parquet(input_file_location)
elif input_file_format == constants.FORMAT_AVRO:
input_data = spark.read \
.format(constants.FORMAT_AVRO_EXTD) \
.load(input_file_location)
elif input_file_format == constants.FORMAT_CSV:
input_data = spark.read \
.format(constants.FORMAT_CSV) \
.option(constants.CSV_HEADER, True) \
.option(constants.CSV_INFER_SCHEMA, True) \
.load(input_file_location)
elif input_file_format == constants.FORMAT_JSON:
input_data = spark.read \
.json(input_file_location)
# Write
input_data.write \
.format(constants.FORMAT_BIGQUERY) \
.option(constants.TABLE, bq_dataset + "." + bq_table) \
.option(constants.TEMP_GCS_BUCKET, bq_temp_bucket) \
.mode(output_mode) \
.save()