def update_pipeline_execution()

in sdlf-datalakeLibrary/python/datalake_library/octagon/peh.py [0:0]


    def update_pipeline_execution(self, status, component=None, issue_comment=None):
        self.logger.debug("peh create_execution() called")

        throw_if_false(self.client.is_pipeline_set(), "Pipeline execution is not yet assigned")
        peh_id = self.client.pipeline_execution_id

        peh_rec = self.get_peh_record(peh_id)
        if peh_rec:
            is_active = peh_rec["active"]
        else:
            is_active = False
        throw_if_false(is_active, "Pipeline execution is not active")

        version = peh_rec["version"]
        start_time = peh_rec["start_timestamp"]

        current_time = datetime.datetime.utcnow()
        utc_time_iso = get_timestamp_iso(current_time)
        local_date_iso = get_local_date()

        if status in [PEH_STATUS_COMPLETED, PEH_STATUS_CANCELED, PEH_STATUS_FAILED]:

            duration_sec = get_duration_sec(start_time, utc_time_iso)

            if status == PEH_STATUS_COMPLETED:
                is_success = True
            else:
                is_success = False

            expr_names = {
                "#H": "history",
                "#St": "status",
                "#V": "version",
                "#LUT": "last_updated_timestamp",
                "#STT": "status_last_updated_timestamp",
                "#A": "active",
                "#V": "version",
                "#ETS": "end_timestamp",
                "#S": "success",
                "#D": "duration_in_seconds",
            }

            if component:
                history_list = [{"status": status, "timestamp": utc_time_iso, "component": component}]
            else:
                history_list = [{"status": status, "timestamp": utc_time_iso}]

            expr_values = {
                ":H": history_list,
                ":St": status,
                ":LUT": utc_time_iso,
                ":STT": status + "#" + utc_time_iso,
                ":INC": 1,
                ":ETS": utc_time_iso,
                ":A": False,
                ":S": is_success,
                ":V": version,
                ":D": Decimal(str(duration_sec)),
            }

            update_expr = (
                "SET #H = list_append(#H, :H), #S = :S, #V = :V + :INC,"
                "#LUT = :LUT, #A = :A, #St = :St, #ETS = :ETS, #D = :D,"
                "#STT = :STT"
            )

            if is_not_empty(issue_comment):
                expr_names["#C"] = "issue_comment"
                expr_values[":C"] = issue_comment
                update_expr += ", #C = :C"

        else:

            expr_names = {
                "#H": "history",
                "#St": "status",
                "#V": "version",
                "#LUT": "last_updated_timestamp",
                "#STT": "status_last_updated_timestamp"
            }

            if component:
                history_list = [{"status": status, "timestamp": utc_time_iso, "component": component}]
            else:
                history_list = [{"status": status, "timestamp": utc_time_iso}]

            expr_values = {
                ":H": history_list,
                ":St": status,
                ":STT": status + "#" + utc_time_iso,
                ":LUT": utc_time_iso,
                ":INC": 1,
                ":V": version}
            update_expr = "SET #H = list_append(#H, :H), #St = :St, #STT = :STT, #V = :V + :INC, #LUT = :LUT"

            if is_not_empty(issue_comment):
                expr_names["#C"] = "comment"
                expr_values[":C"] = issue_comment
                update_expr += ", #C = :C"

        # self.logger.debug(f"Update: {update_expr} \nNames: {expr_names} \nValues{expr_values}")

        self.peh_table.update_item(
            Key={"id": peh_id},
            UpdateExpression=update_expr,
            ExpressionAttributeValues=expr_values,
            ExpressionAttributeNames=expr_names,
            ReturnValues="UPDATED_NEW",
        )

        # Add pipeline update for COMPLETED Executions
        if status == PEH_STATUS_COMPLETED:

            self.logger.debug(f"Pipeline: {self.client.pipeline_name}")
            item = self.pipelines_table.get_item(
                Key={"name": self.client.pipeline_name}, ConsistentRead=True, AttributesToGet=["name", "version"]
            )["Item"]
            pipeline_version = item["version"]

            expr_names = {
                "#V": "version",
                "#U": "last_updated_timestamp",
                "#P": "last_execution_id",
                "#D": "last_execution_date",
                "#E": "last_execution_timestamp",
                "#S": "last_execution_status",
                "#X": "last_execution_duration_in_seconds",
            }

            expr_values = {
                ":V": pipeline_version,
                ":INC": 1,
                ":S": status,
                ":P": self.client.pipeline_execution_id,
                ":D": local_date_iso,
                ":E": utc_time_iso,
                ":U": utc_time_iso,
                ":X": Decimal(str(duration_sec)),
            }
            update_expr = "SET #P = :P, #V = :V + :INC, #S = :S, #D = :D, #X = :X, #E = :E, #U = :U"

            self.pipelines_table.update_item(
                Key={"name": self.client.pipeline_name},
                UpdateExpression=update_expr,
                ExpressionAttributeValues=expr_values,
                ExpressionAttributeNames=expr_names,
                ReturnValues="UPDATED_NEW",
            )

        return True