app/services/BucketNotificationConfigurations.scala (146 lines of code) (raw):
package services
import com.theguardian.multimedia.archivehunter.common.clientManagers.S3ClientManager
import org.slf4j.LoggerFactory
import play.api.Configuration
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.{Event, GetBucketNotificationConfigurationRequest, GetBucketNotificationConfigurationResponse, LambdaFunctionConfiguration, NotificationConfiguration, NotificationConfigurationFilter, PutBucketNotificationConfigurationRequest}
import java.util
import javax.inject.{Inject, Singleton}
import scala.util.{Success, Try}
import scala.jdk.CollectionConverters._
@Singleton
class BucketNotificationConfigurations @Inject()(s3ClientMgr:S3ClientManager, config:Configuration) {
private val logger = LoggerFactory.getLogger(getClass)
private val maybeProfile = config.getOptional[String]("externalData.awsProfile")
/**
* Filter function to extract notifications relevant to us - specifically ones that point to a Lambda function called
* `archivehunter-input-`
* @param defn notificqtion definition, in the form of a tuple name:String->NotificationConfiguration
* @return true if this is one of ours, otherwise false
*/
protected def findRelevantNotification(defn:LambdaFunctionConfiguration):Boolean = defn.lambdaFunctionArn().contains("archivehunter-input")
private val expectedEvents = Set(
Event.S3_OBJECT_CREATED,
Event.S3_OBJECT_REMOVED,
Event.S3_OBJECT_RESTORE_COMPLETED,
Event.S3_OBJECT_RESTORE_DELETE,
Event.S3_REDUCED_REDUNDANCY_LOST_OBJECT,
Event.S3_LIFECYCLE_TRANSITION,
Event.S3_LIFECYCLE_EXPIRATION
)
/**
* Creates a new configuration for the bucket monitor lambda function
* @param lambdaArn ARN of the lambda function
* @return a new LambdaConfiguration
*/
def createNewNotification(lambdaArn:String) = {
LambdaFunctionConfiguration.builder
.lambdaFunctionArn(lambdaArn)
.events(util.EnumSet.copyOf(expectedEvents.asJavaCollection))
.id("ArchiveHunterNotification")
.build()
}
/**
* If the given notification configuration needs updating, then returns an updated version of it.
* Otherwise, returns None
* @param configuration s3 notification configuration
* @return
*/
protected def maybeUpdate(configuration: LambdaFunctionConfiguration):Option[LambdaFunctionConfiguration] = {
val events = configuration.events().asScala.toSeq.toSet
logger.debug(s"NotificationConfiguration has these events: ${events.mkString(";")}")
val missingEvents = expectedEvents.diff(events)
val configBuilder = configuration.toBuilder
val emptyFilter = NotificationConfigurationFilter.builder().build()
val firstUpdate = if(missingEvents.nonEmpty) {
logger.info(s"NotificationConfiguration is missing the following events: ${missingEvents.mkString(";")}, it will be re-written")
Some(configBuilder.events(expectedEvents.asJava))
} else {
None
}
val secondUpdate = if(Option(configuration.filter()).isDefined) {
logger.info(s"NotificationConfiguration has a filter present: ${configuration.filter()}, removing")
Some(configBuilder.filter(emptyFilter))
} else {
None
}
(firstUpdate, secondUpdate) match {
case (None, None) => None
case (Some(u), None)=> Some(u.build())
case (None, Some(u))=> Some(u.build())
case (Some(evts), Some(_)) => Some(evts.filter(emptyFilter).build())
}
}
/**
* If we need to update the given LambdaFunctionConfiguration, returns Some() with the updated config.
* Otherwise returns None
* @param defn existing LambdaFunctionConfiguration
* @return either an updated configuration or None
*/
protected def maybeUpdateNotification(defn:LambdaFunctionConfiguration):Option[LambdaFunctionConfiguration] = {
logger.debug(s"Found Lambda notification with name ${defn.id()}")
if(defn.lambdaFunctionArn().contains("archivehunter-input")) {
maybeUpdate(defn)
} else {
None
}
}
protected def isAdditionRequired(f:java.util.List[LambdaFunctionConfiguration]):Boolean = {
! f.asScala.exists(_.lambdaFunctionArn().contains("archivehunter-input"))
}
/**
* Returns true if there are none of our notifications found in the given configuration, therefore requiring
* a new configuration to be added
* @param config BucketNotificaitonConfiguration instance to test
* @return a boolean value indicating whether we need to add one of our own configurations
*/
protected def isAdditionRequired(config:NotificationConfiguration.Builder):Boolean = {
isAdditionRequired(config
.build()
.lambdaFunctionConfigurations()
)
}
/**
* Gathers the additions and modifications, compares them to the previous values and writes to S3 if they differ
* @param bucketName bucket name to write to
* @param configResponse initial BucketNotificationConfigurationResponse
* @param maybeLambdaArn lambda ARN for creating a new notification if required
* @param requiredUpdates list of updates required to existing notifications
* @param s3Client implicitly provided AmazonS3 client object
* @return a Try, containing a tuple of two boolean values. The first is `true` if updates were required, the second is true if they were written.
* This is for compatibility with the return value of parent function verifyNotificationSetup
*/
private def writeUpdatesIfRequired(bucketName:String,
configResponse:GetBucketNotificationConfigurationResponse,
maybeLambdaArn:Option[String],
requiredUpdates:Seq[LambdaFunctionConfiguration])(implicit s3Client:S3Client): Try[(Boolean, Boolean)] = {
var updateRequired = false
val initialConfiguration = NotificationConfiguration.builder()
.lambdaFunctionConfigurations(configResponse.lambdaFunctionConfigurations())
.eventBridgeConfiguration(configResponse.eventBridgeConfiguration())
.queueConfigurations(configResponse.queueConfigurations())
.topicConfigurations(configResponse.topicConfigurations())
//step one - do we need to add a new monitoring configuration? If so put it into the list
val addedConfiguration = if (isAdditionRequired(initialConfiguration)) {
logger.debug(s"$bucketName: No archivehunter lambda found, adding one...")
maybeLambdaArn match {
case Some(lambdaArn) =>
val existingConfigs = configResponse.lambdaFunctionConfigurations().asScala
val updatedConfigs = existingConfigs :+ createNewNotification(lambdaArn)
updateRequired = true //the SDK builder is a _java_ object which returns a mutable "self" rather than an immutable copy reference.
initialConfiguration.lambdaFunctionConfigurations(updatedConfigs.asJava)
case None =>
throw new RuntimeException("Cannot add a lambda monitor because externalData.bucketMonitorLambdaARN is not set in the application config file")
}
} else {
initialConfiguration
}
//step two - do we need to update any of the existing configurations? If so put them into the list
val updatedConfiguration = if(requiredUpdates.nonEmpty) {
updateRequired = true
addedConfiguration.lambdaFunctionConfigurations(requiredUpdates.asJava)
} else {
addedConfiguration
}
//step three - if the config we built has no changes then do nothing, otherwise write out the updated configuration to S3
if(!updateRequired) {
logger.info(s"$bucketName: No updates were required")
Success((false, false))
} else {
Try {
s3Client.putBucketNotificationConfiguration(PutBucketNotificationConfigurationRequest.builder()
.bucket(bucketName)
.notificationConfiguration(updatedConfiguration.build())
.skipDestinationValidation(true)
.build())
}.map(_ => (true, true))
}
}
/**
*
* @param bucketName
* @param region
* @param shouldWriteUpdates
* @return a Try, which fails on error and on success contains a tuple of two boolean values.
* The first value is `true` if updates to the given bucket were _required_, and the second value
* is `true` if required updates to the bucket were _made_.
*/
def verifyNotificationSetup(bucketName:String, region:Option[Region], shouldWriteUpdates:Boolean) = {
val maybeLambdaArn = config.getOptional[String]("externalData.bucketMonitorLambdaARN")
implicit val s3client = s3ClientMgr.getS3Client(maybeProfile, region)
logger.debug(s"Looking for S3 notifications on bucket $bucketName in region ${region.getOrElse("default")}")
for {
response <- Try { s3client.getBucketNotificationConfiguration(GetBucketNotificationConfigurationRequest.builder().bucket(bucketName).build()) }
requiredUpdates <- Try {
response
.lambdaFunctionConfigurations()
.asScala
.map(maybeUpdateNotification)
.collect({case Some(update)=>update})
.toSeq
}
result <- if(shouldWriteUpdates) {
logger.info(s"$bucketName requires updates, writing the new version...")
writeUpdatesIfRequired(bucketName, response, maybeLambdaArn, requiredUpdates)
} else {
val needsUpdate = requiredUpdates.nonEmpty || isAdditionRequired(response.lambdaFunctionConfigurations())
if(needsUpdate) {
logger.info(s"$bucketName configuration is incorrect and requires updates")
} else {
logger.info(s"$bucketName - no updates need to be made to notification configuration")
}
Success((needsUpdate, false))
}
} yield result
}
}