private String setupTopics()

in stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java [182:332]


    private String setupTopics( final String queueName ) throws Exception {

        logger.info( "Setting up setupTopics SNS/SQS..." );

        String primaryTopicArn = AmazonNotificationUtils.getTopicArn( sns, queueName, true );

        if ( logger.isTraceEnabled() ) {
            logger.trace( "SNS/SQS Setup: primaryTopicArn={}", primaryTopicArn );
        }

        String queueUrl = AmazonNotificationUtils.getQueueUrlByName( sqs, queueName );
        String primaryQueueArn = AmazonNotificationUtils.getQueueArnByName( sqs, queueName );

        if ( logger.isTraceEnabled() ) {
            logger.trace( "SNS/SQS Setup: primaryQueueArn={}", primaryQueueArn );
        }

        if ( primaryQueueArn == null ) {
            if ( logger.isTraceEnabled() ) {
                logger.trace( "SNS/SQS Setup: primaryQueueArn is null, creating queue..." );
            }

            queueUrl = AmazonNotificationUtils.createQueue( sqs, queueName, fig );
            primaryQueueArn = AmazonNotificationUtils.getQueueArnByUrl( sqs, queueUrl );

            if ( logger.isTraceEnabled() ) {
                logger.trace( "SNS/SQS Setup: New Queue URL=[{}] ARN=[{}]", queueUrl, primaryQueueArn );
            }
        }

        try {

            SubscribeRequest primarySubscribeRequest = new SubscribeRequest( primaryTopicArn, "sqs", primaryQueueArn );
            sns.subscribe( primarySubscribeRequest );

            // ensure the SNS primary topic has permission to send to the primary SQS queue
            List<String> primaryTopicArnList = new ArrayList<>();
            primaryTopicArnList.add( primaryTopicArn );
            AmazonNotificationUtils.setQueuePermissionsToReceive( sqs, queueUrl, primaryTopicArnList );
        }
        catch ( AmazonServiceException e ) {
            logger.error(
                "Unable to subscribe PRIMARY queue=[{}] to topic=[{}]", queueUrl, primaryTopicArn, e );
        }

        if ( fig.isMultiRegion() && scope.getRegionImplementation() == LegacyQueueScope.RegionImplementation.ALL ) {

            String multiRegion = fig.getRegionList();

            if ( logger.isTraceEnabled() ) {
                logger.trace( "MultiRegion Setup specified, regions: [{}]", multiRegion );
            }

            String[] regionNames = multiRegion.split( "," );

            final Map<String, String> arrQueueArns = new HashMap<>( regionNames.length + 1 );
            final Map<String, String> topicArns = new HashMap<>( regionNames.length + 1 );

            arrQueueArns.put(primaryQueueArn, fig.getPrimaryRegion());
            topicArns.put(primaryTopicArn, fig.getPrimaryRegion());

            for ( String regionName : regionNames ) {

                regionName = regionName.trim();
                Region region = null;
                try {
                    Regions regions = Regions.fromName(regionName);
                    region = Region.getRegion(regions);
                }
                catch (IllegalArgumentException e) {
                    throw new IllegalArgumentException("INVALID REGION FROM CONFIGURATION " + LegacyQueueFig.USERGRID_QUEUE_REGION_LIST + ": " + regionName, e);
                }

                AmazonSQSClient sqsClient = createSQSClient( region );
                AmazonSNSClient snsClient = createSNSClient( region ); // do this stuff synchronously

                // getTopicArn will create the SNS topic if it doesn't exist
                String topicArn = AmazonNotificationUtils.getTopicArn( snsClient, queueName, true );
                topicArns.put( topicArn, regionName );

                // create the SQS queue if it doesn't exist
                String queueArn = AmazonNotificationUtils.getQueueArnByName( sqsClient, queueName );
                if ( queueArn == null ) {
                    queueUrl = AmazonNotificationUtils.createQueue( sqsClient, queueName, fig );
                    queueArn = AmazonNotificationUtils.getQueueArnByUrl( sqsClient, queueUrl );
                }

                arrQueueArns.put( queueArn, regionName );
            }

            if (logger.isTraceEnabled()) {
                logger.trace("Creating Subscriptions...");
            }

            for ( Map.Entry<String, String> queueArnEntry : arrQueueArns.entrySet() ) {
                String queueARN = queueArnEntry.getKey();
                String strSqsRegion = queueArnEntry.getValue();

                Regions sqsRegions = Regions.fromName( strSqsRegion );
                Region sqsRegion = Region.getRegion( sqsRegions );

                AmazonSQSClient subscribeSqsClient = createSQSClient( sqsRegion );

                // ensure the URL used to subscribe is for the correct name/region
                String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName( subscribeSqsClient, queueName );

                // this list used later for adding permissions to queues
                List<String> topicArnList = new ArrayList<>();

                for ( Map.Entry<String, String> topicArnEntry : topicArns.entrySet() ) {

                    String topicARN = topicArnEntry.getKey();
                    topicArnList.add( topicARN );

                    String strSnsRegion = topicArnEntry.getValue();
                    Regions snsRegions = Regions.fromName( strSnsRegion );
                    Region snsRegion = Region.getRegion( snsRegions );

                    AmazonSNSClient subscribeSnsClient = createSNSClient( snsRegion ); // do this stuff synchronously
                    SubscribeRequest subscribeRequest = new SubscribeRequest( topicARN, "sqs", queueARN );

                    try {

                        logger.info( "Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]", queueARN,
                            strSqsRegion, topicARN, strSnsRegion );

                        SubscribeResult subscribeResult = subscribeSnsClient.subscribe( subscribeRequest );
                        String subscriptionARN = subscribeResult.getSubscriptionArn();
                        if ( logger.isTraceEnabled() ) {
                            logger.trace(
                                "Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]",
                                queueARN, topicARN, subscriptionARN );
                        }
                    }
                    catch ( Exception e ) {
                        logger.error( "ERROR Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
                                queueARN, strSqsRegion, topicARN, strSnsRegion , e );
                    }
                }

                if (logger.isTraceEnabled()) {
                    logger.trace("Adding permission to receive messages...");
                }
                // add permission to each queue, providing a list of topics that it's subscribed to
                AmazonNotificationUtils
                    .setQueuePermissionsToReceive( subscribeSqsClient, subscribeQueueUrl, topicArnList );
            }
        }

        return primaryTopicArn;
    }