in stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java [73:184]
public static void setQueuePermissionsToReceive( final AmazonSQSClient sqs, final String queueUrl,
final List<String> topicARNs ) throws Exception {
// retrieve queue ARN and policy
List<String> sqsAttrNames = Arrays.asList(QueueAttributeName.QueueArn.toString(),
QueueAttributeName.Policy.toString());
GetQueueAttributesRequest getQueueAttributesRequest =
new GetQueueAttributesRequest( queueUrl ).withAttributeNames( sqsAttrNames );
GetQueueAttributesResult queueAttributesResult = sqs.getQueueAttributes( getQueueAttributesRequest );
Map<String, String> sqsAttributeMap = queueAttributesResult.getAttributes();
String queueARN = sqsAttributeMap.get(QueueAttributeName.QueueArn.toString());
String policyJson = sqsAttributeMap.get(QueueAttributeName.Policy.toString());
// cannot send ARN in settings update, so remove it
sqsAttributeMap.remove(QueueAttributeName.QueueArn.toString());
// get existing policy from JSON
Policy policy = policyJson != null && policyJson.length() > 0 ? Policy.fromJson(policyJson) : new Policy();
// see if permissions already exist, and find ArnLike conditions
boolean matchingConditionFound = false;
boolean policyEdited = false;
for (Statement statement : policy.getStatements()) {
logger.info("statement id: {}, effect: {}, action: {}, resources:{}",
statement.getId(), statement.getEffect().name(),
statement.getActions().get(0).getActionName(),
statement.getResources().get(0).getId());
// must be Allow effect
if (! statement.getEffect().name().equals(Statement.Effect.Allow.name())) {
continue;
}
// must be SendMessage action
boolean actionFound = false;
for (Action action : statement.getActions()) {
// do lower case comparison, since UI adds SQS.SendMessage but SDK uses sqs.SendMessage
if (action.getActionName().toLowerCase().equals(SQSActions.SendMessage.getActionName().toLowerCase())) {
actionFound = true;
break;
}
}
if (!actionFound) {
continue;
}
// must be same queue resource
boolean queueResourceFound = false;
for (Resource resource : statement.getResources()) {
if (resource.getId().equals(queueARN)) {
queueResourceFound = true;
break;
}
}
if (!queueResourceFound) {
continue;
}
// found matching statement, check conditions for source ARN
for (Condition condition : statement.getConditions()) {
if (logger.isTraceEnabled()) {
logger.trace("condition type: {}, conditionKey: {}", condition.getType(), condition.getConditionKey());
}
if (condition.getType().equals(ArnCondition.ArnComparisonType.ArnLike.name()) &&
condition.getConditionKey().equals(ConditionFactory.SOURCE_ARN_CONDITION_KEY)) {
matchingConditionFound = true;
for (String topicARN : topicARNs) {
if (! condition.getValues().contains(topicARN)) {
// topic doesn't exist, add it
policyEdited = true;
condition.getValues().add(topicARN);
}
}
}
}
}
if (!matchingConditionFound) {
// never found ArnLike SourceArn condition, need to add a statement
List<Condition> conditions = new ArrayList<>();
for (String topicARN : topicARNs) {
conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
}
Statement statement = new Statement(Statement.Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withActions(SQSActions.SendMessage)
.withResources(new Resource(queueARN));
statement.setConditions(conditions);
policy.getStatements().add(statement);
policyEdited = true;
}
if (policyEdited) {
sqsAttributeMap.put(QueueAttributeName.Policy.toString(), policy.toJson());
// log if permissions are being updated
logger.info("updating permissions for queueARN: {}, new policy: {}", queueARN, policy.toJson());
SetQueueAttributesRequest setQueueAttributesRequest = new SetQueueAttributesRequest(queueUrl, sqsAttributeMap);
try {
sqs.setQueueAttributes(setQueueAttributesRequest);
} catch (Exception e) {
logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN,
topicARNs.toString(), e);
}
}
}