in migration/bring-your-own-role/byor.py [0:0]
def wait_for_subscription_grant_deletion(datazone, domain_id, grant_id, max_attempts=30, delay_seconds=5):
"""
Wait for subscription grant deletion to complete
Args:
datazone: DataZone client
domain_id: Domain identifier
grant_id: Subscription grant identifier
max_attempts: Maximum number of polling attempts
delay_seconds: Delay between polling attempts in seconds
Returns:
True if deletion is successful, False otherwise
"""
for attempt in range(max_attempts):
try:
response = datazone.get_subscription_grant(
domainIdentifier=domain_id,
identifier=grant_id
)
status = response.get('status')
if status == 'COMPLETED':
print(f"Deleted subscription grant `{grant_id}` successfully")
return True
elif status in ['REVOKE_FAILED', 'GRANT_AND_REVOKE_FAILED']:
print(f"Deletion failed with status: {status}")
return False
print(f"Deletion of subscription grant: `{grant_id}` in progress. Current status: {status}. Attempt {attempt + 1}/{max_attempts}")
time.sleep(delay_seconds)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceNotFoundException':
print(f"Subscription grant {grant_id} no longer exists")
return True
raise
raise TimeoutError(f"Deletion of subscription grant: `{grant_id}` did not complete after {max_attempts} attempts")