in stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java [182:332]
private String setupTopics( final String queueName ) throws Exception {
logger.info( "Setting up setupTopics SNS/SQS..." );
String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true );
if ( logger.isTraceEnabled() ) {
logger.trace( "SNS/SQS Setup: primaryTopicArn={}", primaryTopicArn );
}
String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName );
String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName );
if ( logger.isTraceEnabled() ) {
logger.trace( "SNS/SQS Setup: primaryQueueArn={}", primaryQueueArn );
}
if ( primaryQueueArn == null ) {
if ( logger.isTraceEnabled() ) {
logger.trace( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." );
}
queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl );
if ( logger.isTraceEnabled() ) {
logger.trace( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn );
}
}
try {
SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn );
sns.subscribe( primarySubscribeRequest );
// ensure the SNS primary topic has permission to send to the primary SQS queue
List<String> primaryTopicArnList = new ArrayList<>();
primaryTopicArnList.add( primaryTopicArn );
AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList );
}
catch ( AmazonServiceException e ) {
logger.error(
"Unable to subscribe PRIMARY queue=[{}] to topic=[{}]", queueUrl, primaryTopicArn, e );
}
if ( fig.isMultiRegion() && scope.getRegionImplementation() == LegacyQueueScope.RegionImplementation.ALL ) {
String multiRegion = fig.getRegionList();
if ( logger.isTraceEnabled() ) {
logger.trace( "MultiRegion Setup specified, regions: [{}]", multiRegion );
}
String[] regionNames = multiRegion.split( "," );
final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );
arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion());
topicArns.put(primaryTopicArn, fig.getPrimaryRegion());
for ( String regionName : regionNames ) {
regionName = regionName.trim();
Region region = null;
try {
Regions regions = Regions.fromName(regionName);
region = Region.getRegion(regions);
}
catch (IllegalArgumentException e) {
throw new IllegalArgumentException("INVALID REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_QUEUE_REGION_LIST + ": " + regionName, e);
}
AmazonSQSClient sqsClient = createSQSClient( region );
AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously
// getTopicArn will create the SNS topic if it doesn't exist
String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
topicArns.put( topicArn, regionName );
// create the SQS queue if it doesn't exist
String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
if ( queueArn == null ) {
queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
}
arrQueueArns.put( queueArn, regionName );
}
if (logger.isTraceEnabled()) {
logger.trace("Creating Subscriptions...");
}
for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
String queueARN = queueArnEntry.getKey();
String strSqsRegion = queueArnEntry.getValue();
Regions sqsRegions = Regions.fromName( strSqsRegion );
Region sqsRegion = Region.getRegion( sqsRegions );
AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion );
// ensure the URL used to subscribe is for the correct name/region
String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName );
// this list used later for adding permissions to queues
List<String> topicArnList = new ArrayList<>();
for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) {
String topicARN = topicArnEntry.getKey();
topicArnList.add( topicARN );
String strSnsRegion = topicArnEntry.getValue();
Regions snsRegions = Regions.fromName( strSnsRegion );
Region snsRegion = Region.getRegion( snsRegions );
AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously
SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN );
try {
logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN,
strSqsRegion, topicARN, strSnsRegion );
SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest );
String subscriptionARN = subscribeResult.getSubscriptionArn();
if ( logger.isTraceEnabled() ) {
logger.trace(
"Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]",
queueARN, topicARN, subscriptionARN );
}
}
catch ( Exception e ) {
logger.error( "ERROR Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
queueARN, strSqsRegion, topicARN, strSnsRegion , e );
}
}
if (logger.isTraceEnabled()) {
logger.trace("Adding permission to receive messages...");
}
// add permission to each queue, providing a list of topics that it's subscribed to
AmazonNotificationUtils
.setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList );
}
}
return primaryTopicArn;
}