in src/main/java/com/awslabs/aws/greengrass/provisioner/implementations/helpers/BasicGroupUpdateHelper.java [344:415]
private void addOrRemoveSubscription(UpdateArguments updateArguments, GroupInformation groupInformation) {
List<Subscription> subscriptions = v2GreengrassHelper.getSubscriptions(groupInformation)
.orElseThrow(() -> new RuntimeException("Group not found, can not continue"));
String source = updateArguments.subscriptionSource;
String target = updateArguments.subscriptionTarget;
String subject = updateArguments.subscriptionSubject;
GreengrassGroupId greengrassGroupId = ImmutableGreengrassGroupId.builder().groupId(groupInformation.id()).build();
if (updateArguments.addSubscription) {
List<Function> functions = v2GreengrassHelper.getFunctions(groupInformation)
.orElseThrow(() -> new RuntimeException("Group not found, can not continue"));
List<Device> devices = v2GreengrassHelper.getDevices(groupInformation)
.orElseThrow(() -> new RuntimeException("Group not found, can not continue"));
// Is the source valid?
Optional<String> validatedSource = validateSubscriptionSourceOrTarget(source, functions, devices);
if (!validatedSource.isPresent()) {
// Source is invalid
throw new RuntimeException(String.join("", "Source [", source, "] ", SUBSCRIPTION_ERROR));
} else {
source = validatedSource.get();
}
// Is the target valid?
Optional<String> validatedTarget = validateSubscriptionSourceOrTarget(target, functions, devices);
if (!validatedTarget.isPresent()) {
// Target is invalid
throw new RuntimeException(String.join("", "Target [", target, "] ", SUBSCRIPTION_ERROR));
} else {
target = validatedTarget.get();
}
Set<Subscription> existingSubscriptions = getMatchingSubscriptions(subscriptions, source, target, subject);
if (existingSubscriptions.size() != 0) {
throw new RuntimeException("Subscription already exists. Nothing to do.");
}
Subscription subscription = subscriptionHelper.createSubscription(source, target, subject);
subscriptions.add(subscription);
log.info(String.join("", "Subscription added [", subscription.source(), ", ", subscription.target(), ", ", subscription.subject(), "]"));
} else if (updateArguments.removeSubscription) {
Set<Subscription> existingSubscriptions = getMatchingSubscriptions(subscriptions, source, target, subject);
if (existingSubscriptions.size() == 0) {
throw new RuntimeException("Subscription doesn't exist. Nothing to do.");
} else if (existingSubscriptions.size() > 1) {
log.error(jsonHelper.toJson(existingSubscriptions));
throw new RuntimeException("More than one matching subscription exists. Specify a more specific match.");
}
// Exactly one, remove it
Subscription subscriptionToRemove = existingSubscriptions.iterator().next();
subscriptions.remove(subscriptionToRemove);
log.info(String.join("", "Subscription removed [", subscriptionToRemove.source(), ", ", subscriptionToRemove.target(), ", ", subscriptionToRemove.subject(), "]"));
} else {
throw new RuntimeException("This should never happen. This is a bug.");
}
String newSubscriptionDefinitionVersionArn = greengrassHelper.createSubscriptionDefinitionAndVersion(subscriptions);
GroupVersion newGroupVersion = GroupVersion.builder()
.subscriptionDefinitionVersionArn(newSubscriptionDefinitionVersionArn)
.build();
// Do nothing extra if successful
createAndWaitForDeployment(greengrassGroupId, newGroupVersion);
}