def main()

in source/glue-job-scripts/convert_parquet.py [0:0]


def main():
    """
    This script gets JSON data from the input S3 bucket and convert the data to the parquet data.
    As the JSON data can have various format, when converting the JSON data, it changes the data format so Athena can query with the specific data.
    """

    global delimiter, message_alias_key_name, messages_key_name, \
        message_quality_key_name, message_timestamp_key_name, message_value_key_name

    # Sets Glue context and logging
    spark_context = SparkContext()
    glue_context = GlueContext(spark_context)
    job = Job(glue_context)
    logger = glue_context.get_logger()

    # Checks if there are new data in the input S3 bucket
    logger.info(f"Partition: {PARTITION}")
    logger.info("Checks the new data in the input S3 bucket...")
    if has_new_s3_data():
        # Gets the raw JSON data from the input S3 bucket
        logger.info("Gets the raw JSON data from the input S3 bucket...")
        raw_json_data = glue_context.create_dynamic_frame.from_options(
            connection_options={
                "paths": [f"s3://{INPUT_BUCKET}/{PARTITION}/"],
                "recurse": True,
                "groupFiles": "inPartition"
            },
            connection_type="s3",
            format="json",
            transformation_ctx="jsonsource"
        )

        # Gets the metadata from the config DynamoDB table
        logger.info("Gets the metadata from the config DynamoDB table...")
        metadata = get_metadata()
        delimiter = metadata["Delimiter"]
        messages_key_name = metadata["MessagesKeyName"]
        message_alias_key_name = metadata["MessageAliasKeyName"]
        message_quality_key_name = metadata["MessageQualityKeyName"]
        message_timestamp_key_name = metadata["MessageTimestampKeyName"]
        message_value_key_name = metadata["MessageValueKeyName"]
        logger.info(f"delimiter = {delimiter}, message_alias_key_name = {message_alias_key_name}, messages_key_name = {messages_key_name}, message_timestamp_key_name = {message_timestamp_key_name}, message_value_key_name = {message_value_key_name}")

        # Converts the raw data
        logger.info("Converts the raw data...")
        converted_json_data = Map.apply(frame=raw_json_data, f=convert_data_format)

        # Flattens the converted array data to multiple raws
        logger.info ("Flattens the converted array data to multiple raws...")
        exploded_data = converted_json_data.toDF().select(explode(col("messages")).alias("collection")).select("collection.*")
        mapped_data = DynamicFrame.fromDF(exploded_data, glue_context, "mapped_data")

        # Writes the parquet data to the output S3 bucket
        logger.info("Writes the parquet data to the output S3 bucket...")
        glue_context.write_dynamic_frame.from_options(
            connection_options={
                "path": f"s3://{OUTPUT_BUCKET}/{PARTITION}/"
            },
            connection_type="s3",
            format="parquet",
            frame=mapped_data,
            transformation_ctx="parquetoutput"
        )
    else:
        logger.info("There is no new data in the input bucket.")
        raise NoNewDataException(f"No new data for {PARTITION} partition.")

    # Bookmarks the processing
    job.commit()
    logger.info("All done")