in src/python/detectors/sns_no_bind_subscribe_publish_rule/sns_no_bind_subscribe_publish_rule.py [0:0]
def sns_publish_noncompliant(self, sqs_arn: str, topic_arn: str) -> None:
import boto3
session = boto3.Session()
sns_client = session.client('sns')
sns_client.subscribe(TopicArn=topic_arn, Protocol='sqs',
Endpoint=sqs_arn,
ReturnSubscriptionArn=True)
# Noncompliant: incorrect binding of SNS publish operations
# with 'subscribe' or 'create_topic' operations.
sns_client.publish(TopicArn=topic_arn,
Message='test message for SQS',
MessageAttributes={'attr1': {
'DataType': 'String',
'StringValue': "short_uid"
}
}
)