app/controllers/BrowseCollectionController.scala (116 lines of code) (raw):
package controllers
import akka.actor.ActorSystem
import akka.stream.Materializer
import auth.{BearerTokenAuth, Security}
import com.sksamuel.elastic4s.http.search.TermsAggResult
import com.theguardian.multimedia.archivehunter.common.clientManagers.{ESClientManager, S3ClientManager}
import com.theguardian.multimedia.archivehunter.common.cmn_models.{PathCacheIndexer, ScanTarget, ScanTargetDAO}
import helpers.{ItemFolderHelper, WithScanTarget}
import javax.inject.{Inject, Singleton}
import play.api.libs.circe.Circe
import play.api.{Configuration, Logger}
import play.api.mvc.{AbstractController, ControllerComponents}
import responses.{ErrorListResponse, GenericErrorResponse, ObjectListResponse, PathInfoResponse}
import io.circe.syntax._
import io.circe.generic.auto._
import org.slf4j.LoggerFactory
import play.api.cache.SyncCacheApi
import requests.SearchRequest
import scala.annotation.switch
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
@Singleton
class BrowseCollectionController @Inject() (override val config:Configuration,
s3ClientMgr:S3ClientManager,
scanTargetDAO:ScanTargetDAO,
override val controllerComponents: ControllerComponents,
esClientMgr:ESClientManager,
override val bearerTokenAuth:BearerTokenAuth,
folderHelper:ItemFolderHelper,
override val cache:SyncCacheApi)
(implicit actorSystem:ActorSystem, mat:Materializer)
extends AbstractController(controllerComponents) with Security with WithScanTarget with Circe{
import com.sksamuel.elastic4s.http.ElasticDsl._
override protected val logger=LoggerFactory.getLogger(getClass)
private val awsProfile = config.getOptional[String]("externalData.awsProfile")
private val esClient = esClientMgr.getClient()
private val indexName = config.get[String]("externalData.indexName")
private val pathCacheIndexer = new PathCacheIndexer(config.getOptional[String]("externalData.pathCacheIndex").getOrElse("pathcache"), esClient)
/**
* get all of the "subfolders" ("common prefix" in s3 parlance) for the provided bucket, but only if it
* is one that is registered as managed by us.
* this is to drive the tree view in the browse window
* @param collectionName s3 bucket to query
* @param prefix parent folder to list. If none, then lists the root
* @return
*/
def getFolders(collectionName:String, prefix:Option[String]) = IsAuthenticatedAsync { uid=> request=>
val maybePrefix = prefix.flatMap(pfx=>{
if(pfx=="") None else Some(pfx)
})
val maybePrefixPartsLength = maybePrefix.map(_.split("/").length)
pathCacheIndexer.getPaths(collectionName, maybePrefix, maybePrefixPartsLength.getOrElse(0)+1)
.map(results=>{
logger.debug("getFolders got result: ")
results.foreach(summ=>logger.debug(s"\t$summ"))
Ok(ObjectListResponse("ok","folder",results.map(_.key),-1).asJson)
}).recover({
case err:Throwable=>
logger.error("Could not get prefixes from index: ", err)
InternalServerError(GenericErrorResponse("error", err.toString).asJson)
})
}
/**
* converts a TermsAggResult from Elasticsearch into a simple map of bucket->docCount
* @param result TermsAggResult from Elasticsearch
* @return a Map of String->Long
*/
def bucketsToCountMap(result:TermsAggResult) = {
result.buckets.map(b=>Tuple2(b.key,b.docCount)).toMap
}
def pathSummary(collectionName:String, prefix:Option[String]) = IsAuthenticatedAsync(circe.json(2048)) { uid=> request=>
request.body.as[SearchRequest].fold(
err=>
Future(BadRequest(GenericErrorResponse("bad_request", err.toString).asJson)),
searchRequest=>
withScanTargetAsync(collectionName, scanTargetDAO) { target=>
val queries = Seq(
Some(matchQuery("bucket.keyword", collectionName)),
prefix.map(pfx=>termQuery("path", pfx))
).collect({case Some(x)=>x}) ++ searchRequest.toSearchParams
val aggs = Seq(
sumAgg("totalSize","size"),
termsAgg("deletedCounts","beenDeleted"),
termsAgg("proxiedCounts","beenProxied"),
termsAgg("typesCount", "mimeType.major.keyword")
)
esClient.execute(search(indexName) query boolQuery().must(queries) aggregations aggs).map(response=>{
(response.status: @switch) match {
case 200=>
logger.info(s"Got ${response.result.aggregations}")
Ok(PathInfoResponse("ok",
response.result.hits.total,
response.result.aggregations.sum("totalSize").value.toLong,
bucketsToCountMap(response.result.aggregations.terms("deletedCounts")),
bucketsToCountMap(response.result.aggregations.terms("proxiedCounts")),
bucketsToCountMap(response.result.aggregations.terms("typesCount")),
).asJson)
case _=>
InternalServerError(GenericErrorResponse("search_error", response.error.reason).asJson)
}
})
})
}
def getCollections() = IsAuthenticatedAsync { uid=> request=>
userProfileFromSession(request.session) match {
case Some(Right(profile)) =>
scanTargetDAO.allScanTargets().map(resultList => {
val errors = resultList.collect({ case Left(err) => err })
if (errors.nonEmpty) {
InternalServerError(ErrorListResponse("db_error", "", errors.map(_.toString)).asJson)
} else {
val collectionsList = resultList.collect({case Right(target)=>target}).map(_.bucketName)
val allowedCollections = if(profile.allCollectionsVisible){
collectionsList
} else {
collectionsList.intersect(profile.visibleCollections)
}
Ok(ObjectListResponse("ok", "collection", allowedCollections.sorted, allowedCollections.length).asJson)
}
})
case Some(Left(error))=>
logger.error(s"Corrupted login profile? ${error.toString}")
Future(InternalServerError(GenericErrorResponse("profile_error","Your login profile seems corrupted, try logging out and logging in again").asJson))
case None=>
logger.error(s"No user profile in session")
Future(Forbidden(GenericErrorResponse("profile_error","You do not appear to be logged in").asJson))
}
}
}