in python/dataproc_templates/azure/azure_blob_storage_to_bigquery.py [0:0]
def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:
logger: Logger = self.get_logger(spark=spark)
# Arguments
input_file_location: str = args[constants.AZ_BLOB_BQ_INPUT_LOCATION]
big_query_dataset: str = args[constants.AZ_BLOB_BQ_OUTPUT_DATASET]
big_query_table: str = args[constants.AZ_BLOB_BQ_OUTPUT_TABLE]
input_file_format: str = args[constants.AZ_BLOB_BQ_INPUT_FORMAT]
bq_temp_bucket: str = args[constants.AZ_BLOB_BQ_LD_TEMP_BUCKET_NAME]
output_mode: str = args[constants.AZ_BLOB_BQ_OUTPUT_MODE]
storage_account: str = args[constants.AZ_BLOB_STORAGE_ACCOUNT]
container_name: str = args[constants.AZ_BLOB_CONTAINER_NAME]
sas_token: str = args[constants.AZ_BLOB_SAS_TOKEN]
spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net", sas_token)
ignore_keys = {constants.AZ_BLOB_SAS_TOKEN}
filtered_args = {key:val for key,val in args.items() if key not in ignore_keys}
logger.info(f"Starting Azure to BigQuery spark job with parameters:\n {pprint.pformat(filtered_args)}")
# Read
input_data: DataFrame
if input_file_format == constants.FORMAT_PRQT:
input_data = spark.read \
.parquet(input_file_location)
elif input_file_format == constants.FORMAT_AVRO:
input_data = spark.read \
.format(constants.FORMAT_AVRO_EXTD) \
.load(input_file_location)
elif input_file_format == constants.FORMAT_CSV:
input_data = spark.read \
.format(constants.FORMAT_CSV) \
.option(constants.HEADER, True) \
.option(constants.INFER_SCHEMA, True) \
.load(input_file_location)
elif input_file_format == constants.FORMAT_JSON:
input_data = spark.read \
.json(input_file_location)
elif input_file_format == constants.FORMAT_DELTA:
input_data = spark.read \
.format(constants.FORMAT_DELTA) \
.load(input_file_location)
# Write
input_data.write \
.format(constants.FORMAT_BIGQUERY) \
.option(constants.TABLE, big_query_dataset + "." + big_query_table) \
.option(constants.AZ_BLOB_BQ_TEMP_BUCKET, bq_temp_bucket) \
.mode(output_mode) \
.save()