in sdlf-datalakeLibrary/python/datalake_library/octagon/peh.py [0:0]
def update_pipeline_execution(self, status, component=None, issue_comment=None):
self.logger.debug("peh create_execution() called")
throw_if_false(self.client.is_pipeline_set(), "Pipeline execution is not yet assigned")
peh_id = self.client.pipeline_execution_id
peh_rec = self.get_peh_record(peh_id)
if peh_rec:
is_active = peh_rec["active"]
else:
is_active = False
throw_if_false(is_active, "Pipeline execution is not active")
version = peh_rec["version"]
start_time = peh_rec["start_timestamp"]
current_time = datetime.datetime.utcnow()
utc_time_iso = get_timestamp_iso(current_time)
local_date_iso = get_local_date()
if status in [PEH_STATUS_COMPLETED, PEH_STATUS_CANCELED, PEH_STATUS_FAILED]:
duration_sec = get_duration_sec(start_time, utc_time_iso)
if status == PEH_STATUS_COMPLETED:
is_success = True
else:
is_success = False
expr_names = {
"#H": "history",
"#St": "status",
"#V": "version",
"#LUT": "last_updated_timestamp",
"#STT": "status_last_updated_timestamp",
"#A": "active",
"#V": "version",
"#ETS": "end_timestamp",
"#S": "success",
"#D": "duration_in_seconds",
}
if component:
history_list = [{"status": status, "timestamp": utc_time_iso, "component": component}]
else:
history_list = [{"status": status, "timestamp": utc_time_iso}]
expr_values = {
":H": history_list,
":St": status,
":LUT": utc_time_iso,
":STT": status + "#" + utc_time_iso,
":INC": 1,
":ETS": utc_time_iso,
":A": False,
":S": is_success,
":V": version,
":D": Decimal(str(duration_sec)),
}
update_expr = (
"SET #H = list_append(#H, :H), #S = :S, #V = :V + :INC,"
"#LUT = :LUT, #A = :A, #St = :St, #ETS = :ETS, #D = :D,"
"#STT = :STT"
)
if is_not_empty(issue_comment):
expr_names["#C"] = "issue_comment"
expr_values[":C"] = issue_comment
update_expr += ", #C = :C"
else:
expr_names = {
"#H": "history",
"#St": "status",
"#V": "version",
"#LUT": "last_updated_timestamp",
"#STT": "status_last_updated_timestamp"
}
if component:
history_list = [{"status": status, "timestamp": utc_time_iso, "component": component}]
else:
history_list = [{"status": status, "timestamp": utc_time_iso}]
expr_values = {
":H": history_list,
":St": status,
":STT": status + "#" + utc_time_iso,
":LUT": utc_time_iso,
":INC": 1,
":V": version}
update_expr = "SET #H = list_append(#H, :H), #St = :St, #STT = :STT, #V = :V + :INC, #LUT = :LUT"
if is_not_empty(issue_comment):
expr_names["#C"] = "comment"
expr_values[":C"] = issue_comment
update_expr += ", #C = :C"
# self.logger.debug(f"Update: {update_expr} \nNames: {expr_names} \nValues{expr_values}")
self.peh_table.update_item(
Key={"id": peh_id},
UpdateExpression=update_expr,
ExpressionAttributeValues=expr_values,
ExpressionAttributeNames=expr_names,
ReturnValues="UPDATED_NEW",
)
# Add pipeline update for COMPLETED Executions
if status == PEH_STATUS_COMPLETED:
self.logger.debug(f"Pipeline: {self.client.pipeline_name}")
item = self.pipelines_table.get_item(
Key={"name": self.client.pipeline_name}, ConsistentRead=True, AttributesToGet=["name", "version"]
)["Item"]
pipeline_version = item["version"]
expr_names = {
"#V": "version",
"#U": "last_updated_timestamp",
"#P": "last_execution_id",
"#D": "last_execution_date",
"#E": "last_execution_timestamp",
"#S": "last_execution_status",
"#X": "last_execution_duration_in_seconds",
}
expr_values = {
":V": pipeline_version,
":INC": 1,
":S": status,
":P": self.client.pipeline_execution_id,
":D": local_date_iso,
":E": utc_time_iso,
":U": utc_time_iso,
":X": Decimal(str(duration_sec)),
}
update_expr = "SET #P = :P, #V = :V + :INC, #S = :S, #D = :D, #X = :X, #E = :E, #U = :U"
self.pipelines_table.update_item(
Key={"name": self.client.pipeline_name},
UpdateExpression=update_expr,
ExpressionAttributeValues=expr_values,
ExpressionAttributeNames=expr_names,
ReturnValues="UPDATED_NEW",
)
return True