app/vidispine/VidispineCommunicator.scala (93 lines of code) (raw):

package vidispine import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers.{Accept, Authorization, BasicHttpCredentials} import akka.stream.Materializer import utils.AkkaHttpHelpers import utils.AkkaHttpHelpers.{RedirectRequired, RetryRequired, consumeStream, contentBodyToJson} import org.slf4j.{LoggerFactory, MDC} import scala.concurrent.{ExecutionContext, Future} class VidispineCommunicator(config:VidispineConfig) (implicit ec:ExecutionContext, mat:Materializer, actorSystem:ActorSystem){ private final val logger = LoggerFactory.getLogger(getClass) private final val maxFilesToFetch = 10000 protected def callHttp = Http() /** * Call out to Vidispine and return the content stream if successful. Use this for streaming raw content directly elsewhere * @param req HttpRequest to undertake, authorization is added to this * @param attempt Attempt counter, don't specify this * @param retryLimit Maximum number of retries * @return */ protected def callToVidispineRaw(req: HttpRequest, attempt: Int = 1, retryLimit:Int=10):Future[Option[HttpEntity]] = if (attempt > retryLimit) { Future.failed(new RuntimeException("Too many retries, see logs for details")) } else { logger.debug(s"Vidispine request URL is ${req.uri.toString()}") val updatedReq = req.withHeaders(req.headers ++ Seq(Authorization(BasicHttpCredentials(config.username, config.password)))) val loggerContext = Option(MDC.getCopyOfContextMap) callHttp .singleRequest(updatedReq) .flatMap(response=>{ if(loggerContext.isDefined) MDC.setContextMap(loggerContext.get) AkkaHttpHelpers.handleResponse(response,"Vidispine") }) .flatMap({ case Right(Some(entity))=> if(loggerContext.isDefined) MDC.setContextMap(loggerContext.get) Future(Some(entity)) case Right(None)=> if(loggerContext.isDefined) MDC.setContextMap(loggerContext.get) Future(None) case Left(RedirectRequired(newUri))=> if(loggerContext.isDefined) MDC.setContextMap(loggerContext.get) logger.info(s"Vidispine redirected to $newUri") callToVidispineRaw(req.withUri(newUri), attempt+1, retryLimit) case Left(RetryRequired)=> if(loggerContext.isDefined) MDC.setContextMap(loggerContext.get) Thread.sleep(500*attempt) callToVidispineRaw(req, attempt+1, retryLimit) }) } /** * More conventional `callTo` method which adds an "Accept: application/json" for Vidispine and then attempts * to decode the content using Circe to the given domain object. If the parsing fails, then the future will fail too. * @param req Request to make. Authorization and Accept are both added * @param retryLimit Maximum number of retries to do before failing * @tparam T Data type to unmarshal returned JSON into * @return A Future, containing the data object or None if a 404 was returned. Other responses return an error. */ protected def callToVidispine[T:io.circe.Decoder](req: HttpRequest, retryLimit:Int=10):Future[Option[T]] = callToVidispineRaw( req.withHeaders(req.headers :+ Accept(MediaRange(MediaTypes.`application/json`))), retryLimit = retryLimit ).flatMap({ case None => Future(None) case Some(entity) => logger.debug(s"Vidispine URL was ${req.uri} with headers ${req.headers}") contentBodyToJson(consumeStream(entity.dataBytes)) }) def getFilesOfProject(projectId: Int, pageSize: Int = 100): Future[Seq[VSOnlineOutputMessage]] = { recursivelyGetFilesOfProject(projectId, pageSize = pageSize).map(_.collect({case Some(t) => t})) } def recursivelyGetFilesOfProject(projectId: Int, start: Int = 1, pageSize: Int = 100, existingResults: Seq[Option[VSOnlineOutputMessage]] = Seq()): Future[Seq[Option[VSOnlineOutputMessage]]] = { getPageOfFilesOfProject(projectId, start, pageSize).flatMap(results => { if (results.isEmpty || start > maxFilesToFetch) { if (start > maxFilesToFetch) logger.warn(s"Exiting early from getting online files, because we have found more than maxFilesToFetch: $maxFilesToFetch, namely ${existingResults.length + results.length} files") Future(existingResults ++ results) } else { recursivelyGetFilesOfProject(projectId, start + results.size, pageSize, existingResults ++ results) } }) } def getPageOfFilesOfProject(projectId:Int, currentItem:Int = 1, pageSize: Int = 100): Future[Seq[Option[VSOnlineOutputMessage]]] = { import io.circe.generic.auto._ val doc = <ItemSearchDocument xmlns="http://xml.vidispine.com/schema/vidispine"> <field> <name>gnm_containing_projects</name> <value>{projectId}</value> </field> <intervals>generic</intervals> </ItemSearchDocument> val searchResult = callToVidispine[SearchResultDocument]( HttpRequest( uri = s"${config.baseUri}/API/search;first=$currentItem;number=$pageSize?content=shape,metadata&tag=original&field=title,gnm_category,gnm_containing_projects,gnm_nearline_id,itemId", method = HttpMethods.PUT, entity = HttpEntity(ContentType(MediaTypes.`application/xml`, HttpCharsets.`UTF-8`), doc.toString))) searchResult.map({ case Some(searchResultDocument) => searchResultDocument.entry.map(simplifiedItem => VSOnlineOutputMessage.fromResponseItem(simplifiedItem, projectId)) case None => Seq[Option[VSOnlineOutputMessage]]() }) } } object VidispineCommunicator { object ResourceType extends Enumeration { val Poster, Thumbnail = Value } }