backend/app/AppComponents.scala (221 lines of code) (raw):

import org.apache.pekko.actor.{ActorSystem, CoordinatedShutdown} import org.apache.pekko.actor.CoordinatedShutdown.Reason import cats.syntax.either._ import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.sqs.{AmazonSQSClient, AmazonSQSClientBuilder} import com.gu.pandomainauth import com.gu.pandomainauth.PublicSettings import controllers.AssetsComponents import controllers.api._ import controllers.frontend.App import controllers.genesis.Genesis import extraction.archives.{RarExtractor, ZipExtractor} import extraction.email.eml.{EmlEmailExtractor, EmlParser} import extraction.email.mbox.MBoxEmailExtractor import extraction.email.msg.MsgEmailExtractor import extraction.email.olm.OlmEmailExtractor import extraction.email.pst.PstEmailExtractor import extraction.ocr.{ImageOcrExtractor, OcrMyPdfExtractor, OcrMyPdfImageExtractor, TesseractPdfOcrExtractor} import extraction.tables.{CsvTableExtractor, ExcelTableExtractor} import extraction.{DocumentBodyExtractor, ExternalTranscriptionExtractor, ExternalTranscriptionWorker, MimeTypeMapper, TranscriptionExtractor, Worker} import ingestion.phase2.IngestStorePolling import org.bouncycastle.jce.provider.BouncyCastleProvider import org.neo4j.driver.v1.{AuthTokens, GraphDatabase} import play.api.ApplicationLoader.Context import play.api.BuiltInComponentsFromContext import play.api.libs.ws.ahc.AhcWSComponents import play.api.mvc.EssentialFilter import play.filters.HttpFiltersComponents import router.Routes import services._ import services.annotations.Neo4jAnnotations import services.events.ElasticsearchEvents import services.index.{ElasticsearchPages, ElasticsearchResources, Pages2} import services.ingestion.IngestionServices import services.manifest.Neo4jManifest import services.observability.{PostgresClientDoNothing, PostgresClientImpl} import services.previewing.PreviewService import services.table.ElasticsearchTable import services.users.Neo4jUserManagement import utils._ import utils.attempt.AttemptAwait._ import utils.auth.providers.{DatabaseUserProvider, PanDomainUserProvider} import utils.auth.totp.{SecureSecretGenerator, Totp} import utils.auth.{DefaultAuthActionBuilder, PasswordHashing, PasswordValidator} import utils.aws.S3Client import utils.controller.{AuthControllerComponents, CloudWatchReportingFailureToResultMapper} import java.net.InetAddress import java.nio.file.Paths import java.security.Security import java.time.Clock import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NonFatal class AppComponents(context: Context, config: Config) extends BuiltInComponentsFromContext(context) with AssetsComponents with HttpFiltersComponents with AhcWSComponents with Logging { // TODO MRB: should we have allowed hosts enabled? how could it point to the ELB? val disabledFilters: Set[EssentialFilter] = Set(allowedHostsFilter) override def httpFilters: Seq[EssentialFilter] = { super.httpFilters.filterNot(disabledFilters.contains) ++ Seq( new AllowFrameFilter, new RequestLoggingFilter(materializer), new ReadOnlyFilter(config.app, materializer) ) } Security.addProvider(new BouncyCastleProvider()) val router = try { val workerExecutionContext = actorSystem.dispatchers.lookup("work-context") val neo4jExecutionContext = actorSystem.dispatchers.lookup("neo4j-context") val s3ExecutionContext = actorSystem.dispatchers.lookup("s3-context") val ingestionExecutionContext = actorSystem.dispatchers.lookup("ingestion-context") val s3Client = new S3Client(config.s3)(s3ExecutionContext) val sqsClient = if (config.sqs.endpoint.isDefined) AmazonSQSClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(config.sqs.endpoint.get, config.sqs.region)).build() else AmazonSQSClientBuilder.standard().withRegion(config.sqs.region).build() val workerName = config.worker.name.getOrElse(InetAddress.getLocalHost.getHostName) val scratchSpace = new ScratchSpace(Paths.get(config.ingestion.scratchPath)) scratchSpace.setup().await() // data storage services val ingestStorage = S3IngestStorage(s3Client, config.s3.buckets.ingestion, config.s3.buckets.deadLetter).valueOr(failure => throw new Exception(failure.msg)) val blobStorage = S3ObjectStorage(s3Client, config.s3.buckets.collections).valueOr(failure => throw new Exception(failure.msg)) val transcriptStorage = S3ObjectStorage(s3Client, config.s3.buckets.transcription).valueOr(failure => throw new Exception(failure.msg)) val postgresClient = config.postgres match { case Some(postgresConfig) => new PostgresClientImpl(postgresConfig) case None => logger.warn("Postgres config not found, using dummy postgres client!") new PostgresClientDoNothing } val esClient = ElasticsearchClient(config).await() val esResources = new ElasticsearchResources(esClient, config.elasticsearch.indexName).setup().await() val esTables = new ElasticsearchTable(esClient, config.elasticsearch.tableRowIndexName).setup().await() val esEvents = new ElasticsearchEvents(esClient, config.elasticsearch.eventIndexName).setup().await() val esPages = new ElasticsearchPages(esClient, config.elasticsearch.pageIndexNamePrefix).setup().await() val pages2 = new Pages2(esClient, config.elasticsearch.pageIndexNamePrefix) val neo4jDriver = GraphDatabase.driver(config.neo4j.url, AuthTokens.basic(config.neo4j.user, config.neo4j.password)) val manifest = Neo4jManifest.setupManifest(neo4jDriver, neo4jExecutionContext, config.neo4j.queryLogging).valueOr(failure => throw new Exception(failure.msg)) val annotations = Neo4jAnnotations.setupAnnotations(neo4jDriver, neo4jExecutionContext, config.neo4j.queryLogging).valueOr(failure => throw new Exception(failure.msg)) val users = Neo4jUserManagement(neo4jDriver, neo4jExecutionContext, config.neo4j.queryLogging, manifest, esResources, esPages, annotations) val metricsService = config.aws.map(new CloudwatchMetricsService(_)).getOrElse(new NoOpMetricsService()) val userProvider = config.auth.provider match { case config: DatabaseAuthConfig => val secureSecretGenerator = new SecureSecretGenerator() val totpService = Totp.googleAuthenticatorInstance() val passwordHashingService = new PasswordHashing() val passwordValidator = new PasswordValidator(config.minPasswordLength) new DatabaseUserProvider(config, passwordHashingService, users, totpService, secureSecretGenerator, passwordValidator) case config: PandaAuthConfig => val credentials = AwsCredentials(profile = config.aws.profile) val pandaS3Client = AwsS3Clients.pandaS3Client(credentials, config.aws.region) val publicSettings = new PublicSettings(config.publicSettingsKey, config.bucketName, pandaS3Client) // start polling of S3 bucket for public key publicSettings.start() new PanDomainUserProvider(config, () => publicSettings.verification, users, metricsService) } logger.info(s"Initialised authentication provider '${config.auth.provider}'") // processing services val tika = Tika.createInstance val mimeTypeMapper = new MimeTypeMapper() val ingestionServices = IngestionServices(manifest, esResources, blobStorage, tika, mimeTypeMapper, postgresClient) // Preview val previewStorage = S3ObjectStorage(s3Client, config.s3.buckets.preview).valueOr(failure => throw new Exception(failure.msg)) val previewService = PreviewService(config.preview, esResources, blobStorage, previewStorage) // extractors val documentBodyExtractor = new DocumentBodyExtractor(tika, esResources) val zipExtractor = new ZipExtractor(scratchSpace, ingestionServices) val rarExtractor = new RarExtractor(scratchSpace, ingestionServices) val pstExtractor = new PstEmailExtractor(scratchSpace, ingestionServices) val olmExtractor = new OlmEmailExtractor(scratchSpace, ingestionServices) val msgExtractor = new MsgEmailExtractor(scratchSpace, ingestionServices, tika) val emlParser = new EmlParser(scratchSpace, ingestionServices) val emlExtractor = new EmlEmailExtractor(emlParser) val mboxExtractor = new MBoxEmailExtractor(emlParser) val tesseractPdfOcrExtractor = new TesseractPdfOcrExtractor(config.ocr, scratchSpace, esResources, esPages, ingestionServices) val ocrMyPdfExtractor = new OcrMyPdfExtractor(scratchSpace, esResources, esPages, previewStorage, ingestionServices) val imageOcrExtractor = new ImageOcrExtractor(config.ocr, scratchSpace, esResources, ingestionServices) val ocrMyPdfImageExtractor = new OcrMyPdfImageExtractor(config.ocr, scratchSpace, esResources, previewStorage, ingestionServices) val transcriptionExtractor = if (config.worker.useExternalExtractors) { new ExternalTranscriptionExtractor(esResources, config.transcribe, blobStorage, transcriptStorage, sqsClient) } else { new TranscriptionExtractor(esResources, scratchSpace, config.transcribe) } val ocrExtractors = config.ocr.defaultEngine match { case OcrEngine.OcrMyPdf => List(ocrMyPdfExtractor, ocrMyPdfImageExtractor) case OcrEngine.Tesseract => List(tesseractPdfOcrExtractor, imageOcrExtractor) } val csvTableExtractor = new CsvTableExtractor(scratchSpace, esTables) val excelTableExtractor = new ExcelTableExtractor(scratchSpace, esTables) val extractors = List(olmExtractor, zipExtractor, rarExtractor, documentBodyExtractor, pstExtractor, emlExtractor, msgExtractor, mboxExtractor, csvTableExtractor, excelTableExtractor, transcriptionExtractor) ++ ocrExtractors extractors.foreach(mimeTypeMapper.addExtractor) // Common components val failureToResultMapper = new CloudWatchReportingFailureToResultMapper(metricsService) val authActionBuilder = new DefaultAuthActionBuilder(controllerComponents, failureToResultMapper, config.auth.timeouts.maxLoginAge, config.auth.timeouts.maxVerificationAge, users)(configuration, Clock.systemUTC()) val authControllerComponents = new AuthControllerComponents(authActionBuilder, failureToResultMapper, users, controllerComponents) // Controllers val appController = new App(controllerComponents, assets, config, userProvider, config.aws) val authController = new Authentication(authControllerComponents, userProvider, users, config)(configuration, Clock.systemUTC()) val genesisController = new Genesis(controllerComponents, userProvider, users, config.auth.enableGenesisFlow) val eventsController = new Events(authControllerComponents, esEvents) val collectionsController = new Collections(authControllerComponents, manifest, users, esResources, config.s3, esEvents, esPages, ingestionServices, annotations) val blobsController = new Blobs(authControllerComponents, manifest, esResources, blobStorage, previewStorage, postgresClient) val filtersController = new Filters(authControllerComponents, manifest, annotations) val searchController = new Search(authControllerComponents, users, esResources, annotations, metricsService) val documentsController = new Documents(authControllerComponents, manifest, esResources, blobStorage, users, annotations, config.auth.timeouts.maxDownloadAuthAge) val resourceController = new Resource(authControllerComponents, manifest, esResources, esPages, annotations, previewStorage) val emailController = new Emails(authControllerComponents, manifest, esResources, annotations) val mimeTypesController = new MimeTypes(authControllerComponents, manifest) val previewController = new Previews(authControllerComponents, manifest, esResources, previewService, users, annotations, config.auth.timeouts.maxDownloadAuthAge) val workspacesController = new Workspaces(authControllerComponents, annotations, esResources, manifest, users, blobStorage, previewStorage, postgresClient) val commentsController = new Comments(authControllerComponents, manifest, esResources, annotations) val usersController = new Users(authControllerComponents, userProvider) val pagesController = new PagesController(authControllerComponents, manifest, esResources, pages2, annotations, previewStorage) val ingestionController = new Ingestion(authControllerComponents, ingestStorage) val ingestionEventsController = new IngestionEvents(authControllerComponents, postgresClient, users ) val workerControl = config.aws match { case Some(awsDiscoveryConfig) => new AWSWorkerControl(config.worker, awsDiscoveryConfig, ingestStorage, manifest) case None => new PekkoWorkerControl(actorSystem) } // Schedulers if (config.worker.enabled) { logger.info("Worker enabled on this instance") // PFI processors val worker = new Worker(workerName, manifest, blobStorage, extractors, metricsService, postgresClient)(workerExecutionContext) // ingestion phase 2 val phase2IngestionScheduler = new IngestStorePolling(actorSystem, workerExecutionContext, workerControl, ingestStorage, scratchSpace, ingestionServices, config.ingestion.batchSize, metricsService, postgresClient) phase2IngestionScheduler.start() applicationLifecycle.addStopHook(() => phase2IngestionScheduler.stop()) // extractor val workerScheduler = new WorkerScheduler(actorSystem, worker, config.worker.interval)(workerExecutionContext) workerScheduler.start() applicationLifecycle.addStopHook(() => workerScheduler.stop()) // external extractor val externalWorker = new ExternalTranscriptionWorker(manifest, sqsClient, config.transcribe, transcriptStorage, esResources) val externalWorkerScheduler = new ExternalWorkerScheduler(actorSystem, externalWorker, config.worker.interval)(workerExecutionContext) externalWorkerScheduler.start() applicationLifecycle.addStopHook(() => externalWorkerScheduler.stop()) } else { logger.info("Worker disabled on this instance") workerControl.start(actorSystem.scheduler)(workerExecutionContext) applicationLifecycle.addStopHook(() => workerControl.stop()) } // Router new Routes( httpErrorHandler, eventsController, collectionsController, ingestionController, ingestionEventsController, blobsController, filtersController, searchController, documentsController, commentsController, resourceController, pagesController, emailController, mimeTypesController, workspacesController, previewController, usersController, authController, appController, genesisController, assets ) } catch { case NonFatal(e) => // If an exception is thrown then it's helpful (in dev mode at least) to try to shutdown anything that has // already started up. This will ensure that the actor system and other resources are properly tidied up // // If the exception comes initialising the actor system itself then running the CoordinatedShutdown will try and // initialise it again, so we also log the original error to make sure we see it logger.error("Error during initialisation, starting co-ordinated shutdown", e) Await.ready(CoordinatedShutdown(actorSystem).run(new Reason {}), 10 seconds) throw e } }