def list_changelogs()

in src/dfcx_scrapi/core/changelogs.py [0:0]


    def list_changelogs(self, agent_id: str = None, **kwargs):
        """Lists all Change History logs for a CX Agent.

        This method supports log filtering via **kwargs input. The filters
        currently supported are: user_email, resource, display_name, type,
        action, and create_time.
        See https://github.com/googleapis/python-dialogflow-cx/blob/main/
        google/cloud/dialogflowcx_v3beta1/types/changelog.py#L40 for
        pointers on filter examples

        Args:
          agent_id: the formatted CX Agent ID

        Returns:
          List of Change History logs
        """
        request = types.changelog.ListChangelogsRequest()
        request.parent = agent_id

        if kwargs.items():
            filter_list = []
            for key, value in kwargs.items():
                if key == "user_email":
                    filter_list.append(f'user_email = "{value}"')
                elif key == "resource":
                    filter_list.append(f'resource = "{value}"')
                elif key == "display_name":
                    filter_list.append(f'display_name = "{value}"')
                elif key == "type":
                    filter_list.append(f'type = "{value}"')
                elif key == "action":
                    filter_list.append(f'action = "{value}"')
                elif key == "create_time":
                    pass
                    # BUG (pmarlow): Time filters not being accepted properly
                    # TODO (pmarlow): implement input validation
                    # filter_list.append(f"create_time {value}")
                elif key == "create_time_epoch_seconds":
                    pass
                    # BUG (pmarlow): Time filters not being accepted properly
                    # TODO (pmarlow): implement input validation
                    # filter_list.append(
                    # f"\"create_time_epoch_seconds {value}\"")

            if len(filter_list) < 1:
                pass

            elif len(filter_list) == 1:
                filter_str = filter_list[0]
                request.filter = filter_str

            else:
                filter_str = filter_list[0]
                for item in filter_list[1:]:
                    filter_str += f" AND {item}"
                request.filter = filter_str

        client_options = self._set_region(agent_id)
        client = services.changelogs.ChangelogsClient(
            credentials=self.creds, client_options=client_options
        )

        response = client.list_changelogs(request)

        changelogs = []
        for page in response.pages:
            for log in page.changelogs:
                changelogs.append(log)

        return changelogs