in source/glue-job-scripts/convert_parquet.py [0:0]
def main():
"""
This script gets JSON data from the input S3 bucket and convert the data to the parquet data.
As the JSON data can have various format, when converting the JSON data, it changes the data format so Athena can query with the specific data.
"""
global delimiter, message_alias_key_name, messages_key_name, \
message_quality_key_name, message_timestamp_key_name, message_value_key_name
# Sets Glue context and logging
spark_context = SparkContext()
glue_context = GlueContext(spark_context)
job = Job(glue_context)
logger = glue_context.get_logger()
# Checks if there are new data in the input S3 bucket
logger.info(f"Partition: {PARTITION}")
logger.info("Checks the new data in the input S3 bucket...")
if has_new_s3_data():
# Gets the raw JSON data from the input S3 bucket
logger.info("Gets the raw JSON data from the input S3 bucket...")
raw_json_data = glue_context.create_dynamic_frame.from_options(
connection_options={
"paths": [f"s3://{INPUT_BUCKET}/{PARTITION}/"],
"recurse": True,
"groupFiles": "inPartition"
},
connection_type="s3",
format="json",
transformation_ctx="jsonsource"
)
# Gets the metadata from the config DynamoDB table
logger.info("Gets the metadata from the config DynamoDB table...")
metadata = get_metadata()
delimiter = metadata["Delimiter"]
messages_key_name = metadata["MessagesKeyName"]
message_alias_key_name = metadata["MessageAliasKeyName"]
message_quality_key_name = metadata["MessageQualityKeyName"]
message_timestamp_key_name = metadata["MessageTimestampKeyName"]
message_value_key_name = metadata["MessageValueKeyName"]
logger.info(f"delimiter = {delimiter}, message_alias_key_name = {message_alias_key_name}, messages_key_name = {messages_key_name}, message_timestamp_key_name = {message_timestamp_key_name}, message_value_key_name = {message_value_key_name}")
# Converts the raw data
logger.info("Converts the raw data...")
converted_json_data = Map.apply(frame=raw_json_data, f=convert_data_format)
# Flattens the converted array data to multiple raws
logger.info ("Flattens the converted array data to multiple raws...")
exploded_data = converted_json_data.toDF().select(explode(col("messages")).alias("collection")).select("collection.*")
mapped_data = DynamicFrame.fromDF(exploded_data, glue_context, "mapped_data")
# Writes the parquet data to the output S3 bucket
logger.info("Writes the parquet data to the output S3 bucket...")
glue_context.write_dynamic_frame.from_options(
connection_options={
"path": f"s3://{OUTPUT_BUCKET}/{PARTITION}/"
},
connection_type="s3",
format="parquet",
frame=mapped_data,
transformation_ctx="parquetoutput"
)
else:
logger.info("There is no new data in the input bucket.")
raise NoNewDataException(f"No new data for {PARTITION} partition.")
# Bookmarks the processing
job.commit()
logger.info("All done")