in source/aws/services/transit_gateway_peering_attachments.py [0:0]
def describe_transit_gateway_peering_attachments(self,
tgw_id: str,
states: list) -> list:
"""
Describe the tgw peering attachments for the tagged tgw id
:param tgw_id: tgw id of the tagged transit gateway
:param states: use the state to limit the returned response
:return: list of transit gateway peering attachments
"""
try:
response = self.ec2_client\
.describe_transit_gateway_peering_attachments(
Filters=[
{
'Name': 'transit-gateway-id',
'Values': [tgw_id]
},
{
'Name': 'state',
'Values': states
}
]
)
transit_gateway_peering_attachments_list = response.get(
'TransitGatewayPeeringAttachments', [])
next_token = response.get('NextToken', None)
while next_token is not None:
self.logger.info("Handling Next Token: {}".format(next_token))
response = self.ec2_client\
.describe_transit_gateway_peering_attachments(
Filters=[
{
'Name': 'transit-gateway-id',
'Values': [tgw_id]
},
{
'Name': 'state',
'Values': states
}
],
NextToken=next_token)
self.logger.info("Extending TGW Peering Attachment List")
transit_gateway_peering_attachments_list \
.extend(response.get('TransitGatewayPeeringAttachments',
[]))
next_token = response.get('NextToken', None)
return transit_gateway_peering_attachments_list
except ClientError as error:
self.logger.log_unhandled_exception(error)
raise