in benchmark/benchmark/instance.py [0:0]
def terminate_instances(self):
try:
ids, _ = self._get(['pending', 'running', 'stopping', 'stopped'])
size = sum(len(x) for x in ids.values())
if size == 0:
Print.heading(f'All instances are shut down')
return
# Terminate instances.
for region, client in self.clients.items():
if ids[region]:
client.terminate_instances(InstanceIds=ids[region])
# Wait for all instances to properly shut down.
Print.info('Waiting for all instances to shut down...')
self._wait(['shutting-down'])
for client in self.clients.values():
client.delete_security_group(
GroupName=self.SECURITY_GROUP_NAME
)
Print.heading(f'Testbed of {size} instances destroyed')
except ClientError as e:
raise BenchError('Failed to terminate instances', AWSError(e))