in src/dfcx_scrapi/core/changelogs.py [0:0]
def list_changelogs(self, agent_id: str = None, **kwargs):
"""Lists all Change History logs for a CX Agent.
This method supports log filtering via **kwargs input. The filters
currently supported are: user_email, resource, display_name, type,
action, and create_time.
See https://github.com/googleapis/python-dialogflow-cx/blob/main/
google/cloud/dialogflowcx_v3beta1/types/changelog.py#L40 for
pointers on filter examples
Args:
agent_id: the formatted CX Agent ID
Returns:
List of Change History logs
"""
request = types.changelog.ListChangelogsRequest()
request.parent = agent_id
if kwargs.items():
filter_list = []
for key, value in kwargs.items():
if key == "user_email":
filter_list.append(f'user_email = "{value}"')
elif key == "resource":
filter_list.append(f'resource = "{value}"')
elif key == "display_name":
filter_list.append(f'display_name = "{value}"')
elif key == "type":
filter_list.append(f'type = "{value}"')
elif key == "action":
filter_list.append(f'action = "{value}"')
elif key == "create_time":
pass
# BUG (pmarlow): Time filters not being accepted properly
# TODO (pmarlow): implement input validation
# filter_list.append(f"create_time {value}")
elif key == "create_time_epoch_seconds":
pass
# BUG (pmarlow): Time filters not being accepted properly
# TODO (pmarlow): implement input validation
# filter_list.append(
# f"\"create_time_epoch_seconds {value}\"")
if len(filter_list) < 1:
pass
elif len(filter_list) == 1:
filter_str = filter_list[0]
request.filter = filter_str
else:
filter_str = filter_list[0]
for item in filter_list[1:]:
filter_str += f" AND {item}"
request.filter = filter_str
client_options = self._set_region(agent_id)
client = services.changelogs.ChangelogsClient(
credentials=self.creds, client_options=client_options
)
response = client.list_changelogs(request)
changelogs = []
for page in response.pages:
for log in page.changelogs:
changelogs.append(log)
return changelogs