app/controllers/UploadController.scala (103 lines of code) (raw):
package controllers
import ai.x.play.json.Encoders._
import com.amazonaws.services.stepfunctions.model.{ExecutionAlreadyExistsException, ExecutionListItem}
import com.gu.media.MediaAtomMakerPermissionsProvider
import com.gu.media.logging.Logging
import com.gu.media.model.{ClientAsset, ClientAssetProcessing, MediaAtom, YouTubeAsset}
import com.gu.media.upload.model._
import com.gu.media.util.{MediaAtomHelpers, MediaAtomImplicits}
import com.gu.media.youtube.YouTubeVideos
import com.gu.pandahmac.HMACAuthActions
import data.{DataStores, UnpackedDataStores}
import ai.x.play.json.Jsonx
import play.api.libs.json.{Format, Json}
import play.api.mvc.ControllerComponents
import util._
import scala.annotation.tailrec
import scala.util.control.NonFatal
class UploadController(override val authActions: HMACAuthActions, awsConfig: AWSConfig, stepFunctions: StepFunctions,
override val stores: DataStores, override val permissions: MediaAtomMakerPermissionsProvider,
youTube: YouTubeVideos, override val controllerComponents: ControllerComponents)
extends AtomController with Logging with JsonRequestParsing with UnpackedDataStores with MediaAtomImplicits {
import authActions.APIAuthAction
private val credsGenerator = new CredentialsGenerator(awsConfig)
private val uploadDecorator = new UploadDecorator(awsConfig, stepFunctions)
def list(atomId: String) = APIAuthAction { req =>
val atom = MediaAtom.fromThrift(getPreviewAtom(atomId))
val withStatus = ClientAsset.fromAssets(atom.assets).map(addYouTubeStatus)
val assets = withStatus.map { asset => uploadDecorator.addMetadata(atom.id, asset) }
val jobs = stepFunctions.getJobs(atomId)
val uploads = jobs.flatMap(getRunning(assets, _))
Ok(Json.toJson(uploads ++ assets))
}
def create = LookupPermissions { implicit raw =>
parse(raw) { req: UploadRequest =>
if(req.selfHost && !raw.permissions.addSelfHostedAsset) {
Unauthorized(s"User ${raw.user.email} is not authorised with permissions to upload self-hosted asset")
} else {
log.info(s"Request for upload under atom ${req.atomId}. filename=${req.filename}. size=${req.size}, selfHosted=${req.selfHost}")
val thriftAtom = getPreviewAtom(req.atomId)
val atom = MediaAtom.fromThrift(thriftAtom)
val version = MediaAtomHelpers.getNextAssetVersion(thriftAtom.tdata)
val upload = start(atom, raw.user.email, req, version)
log.info(s"Upload created under atom ${req.atomId}. upload=${upload.id}. parts=${upload.parts.size}, selfHosted=${upload.metadata.selfHost}")
Ok(Json.toJson(upload))
}
}
}
def credentials(id: String, key: String) = LookupPermissions { implicit req =>
getPart(id, key) match {
case Some(part) =>
val credentials = credsGenerator.forKey(part.key)
Ok(Json.toJson(credentials))
case None =>
NotFound
}
}
@tailrec
private def start(atom: MediaAtom, email: String, req: UploadRequest, version: Long): Upload = try {
val upload = UploadBuilder.build(atom, email, version, req, awsConfig)
stepFunctions.start(upload)
upload
} catch {
case _: ExecutionAlreadyExistsException =>
start(atom, email, req, version + 1)
}
private def addYouTubeStatus(video: ClientAsset): ClientAsset = video.asset match {
case Some(YouTubeAsset(id)) =>
try {
val status = youTube.getProcessingStatus(id).map(ClientAssetProcessing(_))
video.copy(processing = status)
} catch {
case NonFatal(e) =>
log.error(s"Unable to get YouTube status for ${video.id}", e)
video
}
case _ =>
video
}
private def getPart(id: String, key: String): Option[UploadPart] = for {
upload <- stepFunctions.getById(id)
part <- upload.parts.find(_.key == key)
} yield part
private def getRunning(assets: List[ClientAsset], job: ExecutionListItem): Option[ClientAsset] = {
val alreadyAdded = assets.exists { asset =>
job.getName.endsWith(s"-${asset.id}")
}
if(alreadyAdded) {
None
} else {
val events = stepFunctions.getEventsInReverseOrder(job)
val startTimestamp = job.getStartDate.getTime
val upload = stepFunctions.getTaskEntered(events)
val error = stepFunctions.getExecutionFailed(events)
upload.map { case (state, upload) =>
ClientAsset.fromUpload(state, startTimestamp, upload, error)
}
}
}
}
object UploadController {
case class CreateResponse(id: String, region: String, bucket: String, parts: List[UploadPart])
implicit val createResponseFormat: Format[CreateResponse] = Jsonx.formatCaseClass[CreateResponse]
}