in awscli/customizations/cloudtrail/subscribe.py [0:0]
def setup_new_topic(self, topic, custom_policy=None):
"""
Creates a new SNS topic with an appropriate policy to let CloudTrail
post messages to the topic.
"""
sys.stdout.write(
'Setting up new SNS topic {topic}...\n'.format(topic=topic))
account_id = get_account_id(self.sts)
# Make sure topic doesn't already exist
# Warn but do not fail if ListTopics permissions
# are missing from the IAM role?
try:
topics = self.sns.list_topics()['Topics']
except Exception:
topics = []
LOG.warn('Unable to list topics, continuing...')
if [t for t in topics if t['TopicArn'].split(':')[-1] == topic]:
raise Exception('Topic {topic} already exists.'.format(
topic=topic))
region = self.sns.meta.region_name
# Get the SNS topic policy information to allow CloudTrail
# write-access.
if custom_policy is not None:
policy = custom_policy
else:
policy = self._get_policy(SNS_POLICY_TEMPLATE)
policy = policy.replace('<Region>', region)\
.replace('<SNSTopicOwnerAccountId>', account_id)\
.replace('<SNSTopicName>', topic)
topic_result = self.sns.create_topic(Name=topic)
try:
# Merge any existing topic policy with our new policy statements
topic_attr = self.sns.get_topic_attributes(
TopicArn=topic_result['TopicArn'])
policy = self.merge_sns_policy(topic_attr['Attributes']['Policy'],
policy)
LOG.debug('Topic policy:\n{0}'.format(policy))
# Set the topic policy
self.sns.set_topic_attributes(TopicArn=topic_result['TopicArn'],
AttributeName='Policy',
AttributeValue=policy)
except Exception:
# Roll back topic creation
self.sns.delete_topic(TopicArn=topic_result['TopicArn'])
raise
return topic_result