def run()

in python/dataproc_templates/azure/azure_blob_storage_to_bigquery.py [0:0]


    def run(self, spark: SparkSession, args: Dict[str, Any]) -> None:

        logger: Logger = self.get_logger(spark=spark)

        # Arguments
        input_file_location: str = args[constants.AZ_BLOB_BQ_INPUT_LOCATION]
        big_query_dataset: str = args[constants.AZ_BLOB_BQ_OUTPUT_DATASET]
        big_query_table: str = args[constants.AZ_BLOB_BQ_OUTPUT_TABLE]
        input_file_format: str = args[constants.AZ_BLOB_BQ_INPUT_FORMAT]
        bq_temp_bucket: str = args[constants.AZ_BLOB_BQ_LD_TEMP_BUCKET_NAME]
        output_mode: str = args[constants.AZ_BLOB_BQ_OUTPUT_MODE]
        storage_account: str = args[constants.AZ_BLOB_STORAGE_ACCOUNT]
        container_name: str = args[constants.AZ_BLOB_CONTAINER_NAME]
        sas_token: str = args[constants.AZ_BLOB_SAS_TOKEN]

        spark.conf.set(f"fs.azure.sas.{container_name}.{storage_account}.blob.core.windows.net", sas_token)

        ignore_keys = {constants.AZ_BLOB_SAS_TOKEN}
        filtered_args = {key:val for key,val in args.items() if key not in ignore_keys}
        logger.info(f"Starting Azure to BigQuery spark job with parameters:\n {pprint.pformat(filtered_args)}")

        # Read
        input_data: DataFrame

        if input_file_format == constants.FORMAT_PRQT:
            input_data = spark.read \
                .parquet(input_file_location)
        elif input_file_format == constants.FORMAT_AVRO:
            input_data = spark.read \
                .format(constants.FORMAT_AVRO_EXTD) \
                .load(input_file_location)
        elif input_file_format == constants.FORMAT_CSV:
            input_data = spark.read \
                .format(constants.FORMAT_CSV) \
                .option(constants.HEADER, True) \
                .option(constants.INFER_SCHEMA, True) \
                .load(input_file_location)
        elif input_file_format == constants.FORMAT_JSON:
            input_data = spark.read \
                .json(input_file_location)
        elif input_file_format == constants.FORMAT_DELTA:
            input_data = spark.read \
                .format(constants.FORMAT_DELTA) \
                .load(input_file_location)

        # Write
        input_data.write \
            .format(constants.FORMAT_BIGQUERY) \
            .option(constants.TABLE, big_query_dataset + "." + big_query_table) \
            .option(constants.AZ_BLOB_BQ_TEMP_BUCKET, bq_temp_bucket) \
            .mode(output_mode) \
            .save()