quickstarts/microsoft.datalakestore/yash-datalake/scripts/raw_to_transformed.py (38 lines of code) (raw):

import sys from pyspark.sql import SparkSession from pyspark.sql.functions import lit import datetime now = datetime.datetime.now() year = str(now.year) month = str(now.month) day = str(now.day) today = str(now.date()) def main(storage_account_name, container_name): spark = SparkSession.builder.appName("rawToTransformed").getOrCreate() ''' ******************************************************************************************************** ''' customer_df = spark.read.csv("wasbs://"+container_name+"@"+storage_account_name+".blob.core.windows.net/datasets/customers") customer_df.na.drop() # customer_df = customer_df.withColumn("year", lit(year)) # customer_df = customer_df.withColumn("month", lit(month)) # customer_df = customer_df.withColumn("day", lit(day)) customer_df.write.csv("wasbs://transformed@"+storage_account_name+".blob.core.windows.net/datasets/customers", mode='overwrite') ''' ******************************************************************************************************** ''' demographic_df = spark.read.csv("wasbs://"+container_name+"@"+storage_account_name+".blob.core.windows.net/datasets/demographics") demographic_df.na.drop() # demographic_df = demographic_df.withColumn("year", lit(year)) # demographic_df = demographic_df.withColumn("month", lit(month)) # demographic_df = demographic_df.withColumn("day", lit(day)) demographic_df.write.csv("wasbs://transformed@"+storage_account_name+".blob.core.windows.net/datasets/demographics", mode='overwrite') ''' ******************************************************************************************************** ''' order_df = spark.read.csv("wasbs://"+container_name+"@"+storage_account_name+".blob.core.windows.net/datasets/orders") order_df.na.drop() # order_df = order_df.withColumn("year", lit(year)) # order_df = order_df.withColumn("month", lit(month)) # order_df = order_df.withColumn("day", lit(day)) order_df.write.csv("wasbs://transformed@"+storage_account_name+".blob.core.windows.net/datasets/orders", mode='overwrite') ''' ******************************************************************************************************* ''' product_df = spark.read.csv("wasbs://"+container_name+"@"+storage_account_name+".blob.core.windows.net/datasets/products") product_df.na.drop() # product_df = product_df.withColumn("year", lit(year)) # product_df = product_df.withColumn("month", lit(month)) # product_df = product_df.withColumn("day", lit(day)) product_df.write.csv("wasbs://transformed@"+storage_account_name+".blob.core.windows.net/datasets/products", mode='overwrite') spark.stop() if __name__ == "__main__": main(sys.argv[1], sys.argv[2])