thrall/app/ThrallComponents.scala (77 lines of code) (raw):
import org.apache.pekko.Done
import org.apache.pekko.stream.scaladsl.Source
import com.gu.kinesis.{KinesisRecord, KinesisSource, ConsumerConfig => KclPekkoStreamConfig}
import com.gu.mediaservice.GridClient
import com.gu.mediaservice.lib.config.Services
import com.gu.mediaservice.lib.aws.{S3Ops, ThrallMessageSender}
import com.gu.mediaservice.lib.management.InnerServiceStatusCheckController
import com.gu.mediaservice.lib.metadata.SoftDeletedMetadataTable
import com.gu.mediaservice.lib.play.GridComponents
import com.typesafe.scalalogging.StrictLogging
import controllers.{AssetsComponents, HealthCheck, ReaperController, ThrallController}
import lib._
import lib.elasticsearch._
import lib.kinesis.{KinesisConfig, ThrallEventConsumer}
import play.api.ApplicationLoader.Context
import router.Routes
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success}
class ThrallComponents(context: Context) extends GridComponents(context, new ThrallConfig(_)) with StrictLogging with AssetsComponents {
final override val buildInfo = utils.buildinfo.BuildInfo
val store = new ThrallStore(config)
val metadataEditorNotifications = new MetadataEditorNotifications(config)
val thrallMetrics = new ThrallMetrics(config, actorSystem, applicationLifecycle)
val es = new ElasticSearch(config.esConfig, Some(thrallMetrics), actorSystem.scheduler)
es.ensureIndexExistsAndAliasAssigned()
val services: Services = new Services(config.domainRoot, config.serviceHosts, Set.empty)
val gridClient: GridClient = GridClient(services)(wsClient)
// before firing up anything to consume streams or say we are OK let's do the critical good to go check
private val goodToGoCheckResult = Await.ready(GoodToGoCheck.run(es), 30 seconds)
goodToGoCheckResult.value match {
case Some(Success(_)) => // all good
logger.info("Passed good to go")
case Some(Failure(exception)) =>
logger.error("Failed good to go, aborting startup", exception)
throw exception
case other =>
logger.warn(s"Result of good to go was $other, aborting as this result doesn't make sense")
throw new IllegalStateException("Good to go test didn't pass or throw exception")
}
val messageSender = new ThrallMessageSender(config.thrallKinesisStreamConfig)
val highPriorityKinesisConfig: KclPekkoStreamConfig = KinesisConfig.kinesisConfig(config.kinesisConfig)
val lowPriorityKinesisConfig: KclPekkoStreamConfig = KinesisConfig.kinesisConfig(config.kinesisLowPriorityConfig)
val uiSource: Source[KinesisRecord, Future[Done]] = KinesisSource(highPriorityKinesisConfig)
val automationSource: Source[KinesisRecord, Future[Done]] = KinesisSource(lowPriorityKinesisConfig)
val migrationSourceWithSender: MigrationSourceWithSender = MigrationSourceWithSender(materializer, auth.innerServiceCall, es, gridClient, config.projectionParallelism)
val thrallEventConsumer = new ThrallEventConsumer(
es,
thrallMetrics,
store,
metadataEditorNotifications,
actorSystem
)
val thrallStreamProcessor = new ThrallStreamProcessor(
uiSource,
automationSource,
migrationSourceWithSender.source,
thrallEventConsumer,
actorSystem
)
val streamRunning: Future[Done] = thrallStreamProcessor.run()
val s3 = S3Ops.buildS3Client(config)
val syncChecker = new SyncChecker(
s3,
es,
config.imageBucket,
actorSystem
)
val syncCheckerStream: Future[Done] = syncChecker.run()
val softDeletedMetadataTable = new SoftDeletedMetadataTable(config)
val maybeCustomReapableEligibility = config.maybeReapableEligibilityClass(applicationLifecycle)
val thrallController = new ThrallController(es, store, migrationSourceWithSender.send, messageSender, actorSystem, auth, config.services, controllerComponents, gridClient)
val reaperController = new ReaperController(es, store, authorisation, config, actorSystem.scheduler, maybeCustomReapableEligibility, softDeletedMetadataTable, thrallMetrics, auth, config.services, controllerComponents)
val healthCheckController = new HealthCheck(es, streamRunning.isCompleted, config, controllerComponents)
val InnerServiceStatusCheckController = new InnerServiceStatusCheckController(auth, controllerComponents, config.services, wsClient)
override lazy val router = new Routes(httpErrorHandler, thrallController, reaperController, healthCheckController, management, InnerServiceStatusCheckController, assets)
}