hq/app/services/CacheService.scala (189 lines of code) (raw):
package services
import aws.AwsClients
import aws.iam.IAMClient
import aws.support.{TrustedAdvisorExposedIAMKeys, TrustedAdvisorS3}
import config.Config
import model._
import play.api._
import play.api.inject.ApplicationLifecycle
import rx.lang.scala.Observable
import utils.attempt.{Attempt, FailedAttempt, Failure}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import org.joda.time.DateTime
import utils.Box
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.iam.IamAsyncClient
import software.amazon.awssdk.services.cloudformation.CloudFormationAsyncClient
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.support.SupportAsyncClient
class CacheService(
config: Configuration,
lifecycle: ApplicationLifecycle,
environment: Environment,
cfnClients: AwsClients[CloudFormationAsyncClient],
taClients: AwsClients[SupportAsyncClient],
s3Clients: AwsClients[S3Client],
iamClients: AwsClients[IamAsyncClient],
regions: List[Region]
)(implicit ec: ExecutionContext)
extends Logging {
private val accounts = Config.getAwsAccounts(config)
private def startingCache(cacheContent: String) = {
accounts
.map(acc =>
(acc, Left(Failure.notYetLoaded(acc.id, cacheContent).attempt))
)
.toMap
}
private val publicBucketsBox
: Box[Map[AwsAccount, Either[FailedAttempt, List[BucketDetail]]]] = Box(
startingCache("public buckets")
)
private val credentialsBox
: Box[Map[AwsAccount, Either[FailedAttempt, CredentialReportDisplay]]] =
Box(startingCache("credentials"))
private val exposedKeysBox
: Box[Map[AwsAccount, Either[FailedAttempt, List[ExposedIAMKeyDetail]]]] =
Box(startingCache("exposed keys"))
def getAllPublicBuckets
: Map[AwsAccount, Either[FailedAttempt, List[BucketDetail]]] =
publicBucketsBox.get()
def getPublicBucketsForAccount(
awsAccount: AwsAccount
): Either[FailedAttempt, List[BucketDetail]] = {
publicBucketsBox
.get()
.getOrElse(
awsAccount,
Left(
Failure
.cacheServiceErrorPerAccount(awsAccount.id, "public buckets")
.attempt
)
)
}
def getAllCredentials
: Map[AwsAccount, Either[FailedAttempt, CredentialReportDisplay]] =
credentialsBox.get()
def getCredentialsForAccount(
awsAccount: AwsAccount
): Either[FailedAttempt, CredentialReportDisplay] = {
credentialsBox
.get()
.getOrElse(
awsAccount,
Left(
Failure
.cacheServiceErrorPerAccount(awsAccount.id, "credentials")
.attempt
)
)
}
def getAllExposedKeys
: Map[AwsAccount, Either[FailedAttempt, List[ExposedIAMKeyDetail]]] =
exposedKeysBox.get()
def getExposedKeysForAccount(
awsAccount: AwsAccount
): Either[FailedAttempt, List[ExposedIAMKeyDetail]] = {
exposedKeysBox
.get()
.getOrElse(
awsAccount,
Left(
Failure
.cacheServiceErrorPerAccount(awsAccount.id, "exposed keys")
.attempt
)
)
}
def refreshCredentialsBox(): Unit = {
logger.info("Started refresh of the Credentials data")
for {
updatedCredentialReports <- IAMClient.getAllCredentialReports(
accounts,
credentialsBox.get(),
cfnClients,
iamClients,
regions
)
} yield {
logCacheDataStatus("Credentials", updatedCredentialReports)
credentialsBox.send(updatedCredentialReports.toMap)
}
}
private def refreshPublicBucketsBox(): Unit = {
logger.info("Started refresh of the public S3 buckets data")
for {
allPublicBuckets <- TrustedAdvisorS3.getAllPublicBuckets(
accounts,
taClients,
s3Clients
)
} yield {
logCacheDataStatus("Public buckets", allPublicBuckets)
publicBucketsBox.send(allPublicBuckets.toMap)
}
}
private def refreshExposedKeysBox(): Unit = {
logger.info("Started refresh of the Exposed Keys data")
for {
allExposedKeys <- TrustedAdvisorExposedIAMKeys.getAllExposedKeys(
accounts,
taClients
)
} yield {
logCacheDataStatus("Exposed Keys", allExposedKeys)
exposedKeysBox.send(allExposedKeys.toMap)
}
}
if (environment.mode != Mode.Test) {
val initialDelay =
if (environment.mode == Mode.Prod) 10.seconds
else Duration.Zero
val publicBucketsSubscription =
Observable.interval(initialDelay + 1000.millis, 5.minutes).subscribe {
_ =>
refreshPublicBucketsBox()
}
val exposedKeysSubscription =
Observable.interval(initialDelay + 2000.millis, 5.minutes).subscribe {
_ =>
refreshExposedKeysBox()
}
val credentialsSubscription =
Observable.interval(initialDelay + 4000.millis, 5.minutes).subscribe {
_ =>
refreshCredentialsBox()
}
lifecycle.addStopHook { () =>
publicBucketsSubscription.unsubscribe()
exposedKeysSubscription.unsubscribe()
credentialsSubscription.unsubscribe()
Future.successful(())
}
}
/**
* Prints an overview of this cache data.
*
* If everything succeeded then we say as much. If the cache data contains failures
* we log a warning that shows which accounts are affected and give one failure
* as the underlying cause, if available.
*/
def logCacheDataStatus[A](cacheName: String, data: Seq[(AwsAccount, Either[FailedAttempt, A])]): Unit = {
val (successful, failed) = data.partition { case (_, result) => result.isRight }
if (failed.isEmpty) {
logger.info(s"$cacheName updated: All ${data.size} accounts successful")
} else {
val failedAccountsDetails = failed.flatMap {
case (account, Left(failedAttempt)) =>
Some(s"${account.name}: ${failedAttempt.logMessage}")
case _ => None
}.mkString(", ")
val logMessage = s"$cacheName updated: ${successful.size}/${data.size} accounts succeeded. Failed accounts: $failedAccountsDetails"
failed.flatMap {
case (_, Left(failedAttempt)) =>
failedAttempt.firstException
case _ => None
}.headOption match {
case None =>
logger.warn(logMessage)
case Some(exampleCausedBy) =>
logger.warn(s"$logMessage - see stacktrace for an example cause", exampleCausedBy)
}
}
}
}