public static void setQueuePermissionsToReceive()

in stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java [73:184]


    public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl,
                                                     final List<String> topicARNs ) throws Exception {

        // retrieve queue ARN and policy
        List<String> sqsAttrNames = Arrays.asList(QueueAttributeName.QueueArn.toString(),
            QueueAttributeName.Policy.toString());
        GetQueueAttributesRequest getQueueAttributesRequest =
            new GetQueueAttributesRequest( queueUrl ).withAttributeNames( sqsAttrNames );
        GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( getQueueAttributesRequest );
        Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
        String queueARN = sqsAttributeMap.get(QueueAttributeName.QueueArn.toString());
        String policyJson = sqsAttributeMap.get(QueueAttributeName.Policy.toString());

        // cannot send ARN in settings update, so remove it
        sqsAttributeMap.remove(QueueAttributeName.QueueArn.toString());

        // get existing policy from JSON
        Policy policy = policyJson != null && policyJson.length() > 0 ? Policy.fromJson(policyJson) : new Policy();

        // see if permissions already exist, and find ArnLike conditions
        boolean matchingConditionFound = false;
        boolean policyEdited = false;
        for (Statement statement : policy.getStatements()) {
            logger.info("statement id: {}, effect: {}, action: {}, resources:{}",
                statement.getId(), statement.getEffect().name(),
                statement.getActions().get(0).getActionName(),
                statement.getResources().get(0).getId());

            // must be Allow effect
            if (! statement.getEffect().name().equals(Statement.Effect.Allow.name())) {
                continue;
            }

            // must be SendMessage action
            boolean actionFound = false;
            for (Action action : statement.getActions()) {
                // do lower case comparison, since UI adds SQS.SendMessage but SDK uses sqs.SendMessage
                if (action.getActionName().toLowerCase().equals(SQSActions.SendMessage.getActionName().toLowerCase())) {
                    actionFound = true;
                    break;
                }
            }
            if (!actionFound) {
                continue;
            }

            // must be same queue resource
            boolean queueResourceFound = false;
            for (Resource resource : statement.getResources()) {
                if (resource.getId().equals(queueARN)) {
                    queueResourceFound = true;
                    break;
                }
            }
            if (!queueResourceFound) {
                continue;
            }

            // found matching statement, check conditions for source ARN
            for (Condition condition : statement.getConditions()) {
                if (logger.isTraceEnabled()) {
                    logger.trace("condition type: {}, conditionKey: {}", condition.getType(), condition.getConditionKey());
                }
                if (condition.getType().equals(ArnCondition.ArnComparisonType.ArnLike.name()) &&
                    condition.getConditionKey().equals(ConditionFactory.SOURCE_ARN_CONDITION_KEY)) {
                    matchingConditionFound = true;
                    for (String topicARN : topicARNs) {
                        if (! condition.getValues().contains(topicARN)) {
                            // topic doesn't exist, add it
                            policyEdited = true;
                            condition.getValues().add(topicARN);
                        }
                    }
                }
            }
        }

        if (!matchingConditionFound) {
            // never found ArnLike SourceArn condition, need to add a statement
            List<Condition> conditions = new ArrayList<>();

            for (String topicARN : topicARNs) {

                conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
            }

            Statement statement = new Statement(Statement.Effect.Allow)
                .withPrincipals(Principal.AllUsers)
                .withActions(SQSActions.SendMessage)
                .withResources(new Resource(queueARN));
            statement.setConditions(conditions);

            policy.getStatements().add(statement);
            policyEdited = true;
        }

        if (policyEdited) {
            sqsAttributeMap.put(QueueAttributeName.Policy.toString(), policy.toJson());

            // log if permissions are being updated
            logger.info("updating permissions for queueARN: {}, new policy: {}", queueARN, policy.toJson());

            SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(queueUrl, sqsAttributeMap);

            try {
                sqs.setQueueAttributes(setQueueAttributesRequest);
            } catch (Exception e) {
                logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
                    topicARNs.toString(), e);
            }
        }
    }