refarch/aws-native/batch/glue-scripts/raw2clean_hudi.py [59:84]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    raw_data: DynamicFrame = glue_context.create_dynamic_frame.from_catalog(database=raw_db_name,
                                                                            table_name=raw_table_name,
                                                                            transformation_ctx="raw_data")

    # Terminate early if there is no data to process
    if raw_data.toDF().head() is None:
        job.commit()
        return

    ## @type: CleanDataset
    ## @args: []
    ## @return: cleaned_data
    ## @inputs: [frame = raw_data]
    input_data = raw_data.toDF()
    cleaned_data = input_data.select(*[from_unixtime(c).alias(c) if c == 'processing_datetime' else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[to_timestamp(c).alias(c) if c.endswith('_datetime') else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[to_date(c).alias(c) if c.endswith('_date') else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[col(c).cast('string').alias(c) if c == 'zip' else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[col(c).cast('decimal(15,2)').alias(c) if dict (input_data.dtypes) [c] == 'double' else col(c) for c in input_data.columns])

    ## @type: EnrichDataset
    ## @args: []
    ## @return: enriched_data
    ## @inputs: [frame = cleaned_data]
    enriched_data = cleaned_data.withColumn('etl_processing_datetime', unix_timestamp(f.lit(processing_start_datetime), 'yyyy-MM-dd HH:mm:ss').cast("timestamp")) \
        .withColumn(date_column, f.date_format(f.col(datetime_column), "yyyy-MM-dd").cast("date"))
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



refarch/aws-native/batch/glue-scripts/raw2clean_parquet.py [45:70]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    raw_data: DynamicFrame = glue_context.create_dynamic_frame.from_catalog(database=raw_db_name,
                                                                            table_name=raw_table_name,
                                                                            transformation_ctx="raw_data")

    # Terminate early if there is no data to process
    if raw_data.toDF().head() is None:
        job.commit()
        return

    ## @type: CleanDataset
    ## @args: []
    ## @return: cleaned_data
    ## @inputs: [frame = raw_data]
    input_data = raw_data.toDF()
    cleaned_data = input_data.select(*[from_unixtime(c).alias(c) if c == 'processing_datetime' else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[to_timestamp(c).alias(c) if c.endswith('_datetime') else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[to_date(c).alias(c) if c.endswith('_date') else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[col(c).cast('string').alias(c) if c == 'zip' else col(c) for c in input_data.columns])
    cleaned_data = cleaned_data.select(*[col(c).cast('decimal(15,2)').alias(c) if dict (input_data.dtypes) [c] == 'double' else col(c) for c in input_data.columns])

    ## @type: EnrichDataset
    ## @args: []
    ## @return: enriched_data
    ## @inputs: [frame = cleaned_data]
    enriched_data = cleaned_data.withColumn('etl_processing_datetime', unix_timestamp(f.lit(processing_start_datetime), 'yyyy-MM-dd HH:mm:ss').cast("timestamp")) \
        .withColumn(date_column, f.date_format(f.col(datetime_column), "yyyy-MM-dd").cast("date"))
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



