in source/glue-job-scripts/etl-data-export.py [0:0]
def main():
"""This script will load data from the supplied DynamoDB Table to S3 so it can be analyzed with Athena"""
if (JOB_TYPE == "issues"):
DDB_TABLE_NAME = DDB_ISSUES_TABLE_NAME
GLUE_TABLE_NAME = GLUE_ISSUES_TABLE_NAME
FIELD_PATHS = [
"eventid",
"acknowledged",
"created",
"sitename",
"issuesource",
"priority",
"areaname#status#processname#eventdescription#stationname#devicename#created",
"version",
"devicename",
"devicename#eventid",
"createdat",
"areaname",
"processname",
"createddateutc",
"eventdescription",
"areaname#status#processname#stationname#devicename#created",
"stationname",
"id",
"acknowledgedtime",
"status",
"updatedat",
"closed",
"resolutiontime",
"createdby",
"acknowledgedby",
"closedby",
"rejectedby",
"additionaldetails"
]
elif (JOB_TYPE == "hierarchy"):
DDB_TABLE_NAME = DDB_DATA_HIERARCHY_TABLE_NAME
GLUE_TABLE_NAME = GLUE_DATA_HIERARCHY_TABLE_NAME
FIELD_PATHS = [
"createdat",
"name",
"description",
"id",
"devicestationid",
"type",
"version",
"parentid",
"updatedat",
"areasiteid",
"eventprocessid",
"eventtype",
"priority",
"rootcauses",
"sms",
"eventimgkey",
"email",
"protocol",
"endpoint",
"filterpolicy",
"subscriptionarn",
"stationareaid",
"processareaid",
"alias"
]
else:
raise JobInputException(f"JOB_TYPE was invalid ({JOB_TYPE}). Expecting either \"issues\" or \"hierarchy\"")
log_message([
"Running with the following context:",
f"DDB_TABLE_NAME: {DDB_TABLE_NAME}",
f"GLUE_TABLE_NAME: {GLUE_TABLE_NAME}",
f"GLUE_DB_NAME: {GLUE_DB_NAME}",
f"GLUE_OUTPUT_BUCKET: {GLUE_OUTPUT_BUCKET}"
])
DDB_TABLE_NAME_FORMATTED = DDB_TABLE_NAME.lower().replace('-', '_')
log_message("Mapping columns")
COLUMN_MAPPINGS = list(map(lambda x: get_column_mapping(x), FIELD_PATHS))
log_message("Creating a Dynamic Frame from the DynamoDB table schema")
datasource0 = glue_context.create_dynamic_frame.from_catalog(
database = GLUE_DB_NAME,
table_name = DDB_TABLE_NAME_FORMATTED,
transformation_ctx = "datasource0"
)
log_message("Applying column mappings")
applymapping1 = ApplyMapping.apply(
frame = datasource0,
mappings = COLUMN_MAPPINGS,
transformation_ctx = "applymapping1"
)
log_message("Selecting fields")
selectfields2 = SelectFields.apply(
frame = applymapping1,
paths = FIELD_PATHS,
transformation_ctx = "selectfields2"
)
log_message("Resolving")
resolvechoice3 = ResolveChoice.apply(
frame = selectfields2,
choice = "MATCH_CATALOG",
database = GLUE_DB_NAME,
table_name = GLUE_TABLE_NAME,
transformation_ctx = "resolvechoice3"
)
resolvechoice4 = ResolveChoice.apply(
frame = resolvechoice3,
choice = "make_struct",
transformation_ctx = "resolvechoice4"
)
log_message("Persisting data in S3")
glue_context.write_dynamic_frame.from_catalog(
frame = resolvechoice4,
database = GLUE_DB_NAME,
table_name = GLUE_TABLE_NAME,
transformation_ctx = "datasink5"
)
job.commit()
log_message("Done")