def describe_stack_events()

in integration/helpers/deployer/deployer.py [0:0]


    def describe_stack_events(self, stack_name, time_stamp_marker, **kwargs):
        """
        Calls CloudFormation to get current stack events
        :param stack_name: Name or ID of the stack
        :param time_stamp_marker: last event time on the stack to start streaming events from.
        :return:
        """

        stack_change_in_progress = True
        events = set()
        retry_attempts = 0

        while stack_change_in_progress and retry_attempts <= self.max_attempts:
            try:

                # Only sleep if there have been no retry_attempts
                time.sleep(self.client_sleep if retry_attempts == 0 else 0)
                describe_stacks_resp = self._client.describe_stacks(StackName=stack_name)
                paginator = self._client.get_paginator("describe_stack_events")
                response_iterator = paginator.paginate(StackName=stack_name)
                stack_status = describe_stacks_resp["Stacks"][0]["StackStatus"]
                latest_time_stamp_marker = time_stamp_marker
                for event_items in response_iterator:
                    for event in event_items["StackEvents"]:
                        if event["EventId"] not in events and utc_to_timestamp(event["Timestamp"]) > time_stamp_marker:
                            events.add(event["EventId"])
                            latest_time_stamp_marker = max(
                                latest_time_stamp_marker, utc_to_timestamp(event["Timestamp"])
                            )
                            row_color = self.deploy_color.get_stack_events_status_color(status=event["ResourceStatus"])
                            pprint_columns(
                                columns=[
                                    event["ResourceStatus"],
                                    event["ResourceType"],
                                    event["LogicalResourceId"],
                                    event.get("ResourceStatusReason", "-"),
                                ],
                                width=kwargs["width"],
                                margin=kwargs["margin"],
                                format_string=DESCRIBE_STACK_EVENTS_FORMAT_STRING,
                                format_args=kwargs["format_args"],
                                columns_dict=DESCRIBE_STACK_EVENTS_DEFAULT_ARGS.copy(),
                                color=row_color,
                            )
                        # Skip already shown old event entries
                        elif utc_to_timestamp(event["Timestamp"]) <= time_stamp_marker:
                            time_stamp_marker = latest_time_stamp_marker
                            break
                    else:  # go to next loop if not break from inside loop
                        time_stamp_marker = latest_time_stamp_marker  # update marker if all events are new
                        continue
                    break  # reached here only if break from inner loop!

                if self._check_stack_complete(stack_status):
                    stack_change_in_progress = False
                    break
            except botocore.exceptions.ClientError as ex:
                retry_attempts = retry_attempts + 1
                if retry_attempts > self.max_attempts:
                    LOG.error("Describing stack events for %s failed: %s", stack_name, str(ex))
                    return
                # Sleep in exponential backoff mode
                time.sleep(math.pow(self.backoff, retry_attempts))