override def shape: SourceShape[T] = SourceShape.of()

in app/services/migrationcomponents/VSProjectSource.scala [58:158]


  override def shape: SourceShape[T] = SourceShape.of(out)

  def listConverter(entry:JsValue):IndexedSeq[T]

  /**
    * isolate the Http request so we can mock it in testing
    * @param req HttpRequest to make
    * @return a Future containing the HttpResponse
    */
  def makeHttpRequest(req:HttpRequest) = Http().singleRequest(req)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    private var cachedItems:Seq[T] = Seq()
    private var lastIndex=1

    /**
      * reads in and buffers the response body
      * @param response
      * @return
      */
    private def consumeBody(response:HttpResponse):Future[ByteString] = {
      response.entity.dataBytes.toMat(Sink.reduce[ByteString]((acc, elem)=>acc.concat(elem)))(Keep.right).run()
    }

    def getNextPage():Future[Either[Int, Option[T]]] = {
      val uri = s"$vsBaseUri/API/collection;first=$lastIndex;number=$pageSize?content=metadata&count=false"
      logger.debug(s"URI is $uri")
      val requestXml = <ItemSearchDocument xmlns="http://xml.vidispine.com/schema/vidispine">
        <field>
          <name>gnm_type</name>
          <value>{ gnmType }</value>
        </field>
      </ItemSearchDocument>
      logger.debug(s"requestXml is ${requestXml.toString()}")

      val auth = Authorization(BasicHttpCredentials(vsUser, vsPasswd))
      val accept = Accept(MediaRange(MediaTypes.`application/json`))

      val req = HttpRequest(HttpMethods.PUT, uri, Seq(auth, accept))
        .withEntity(ContentType.WithCharset(MediaTypes.`application/xml`, HttpCharset("UTF-8")(Seq())), requestXml.toString())
      makeHttpRequest(req).flatMap(response=>{
        if(response.status==StatusCodes.BAD_GATEWAY || response.status==StatusCodes.GATEWAY_TIMEOUT) {
          response.entity.discardBytes()
          logger.warn("Vidispine timed out when accessing, retrying...")
          Thread.sleep(2000)
          getNextPage()
        } else {
          consumeBody(response).map(serverBytes => {
            if (response.status != StatusCodes.OK) {
              logger.warn(s"Could not load items from Vidispine: ${serverBytes.utf8String}")
              Left(response.status.intValue())
            } else {
              val serverJson = Json.parse(serverBytes.toArray)
              logger.debug(serverJson.toString())

              val newProjects = (serverJson \ "collection").asOpt[JsValue].map(listConverter).getOrElse(IndexedSeq())
              if(newProjects.nonEmpty) {
                this.synchronized {
                  cachedItems = cachedItems ++ newProjects.tail
                  lastIndex += newProjects.length
                }
              }
              Right(newProjects.headOption)
            }
          })
        }
      })
    }

    val nextItemCb = createAsyncCallback[T](item=>push(out, item))
    val completedCb = createAsyncCallback[Unit](_=>complete(out))
    val errCb = createAsyncCallback[Throwable](err=>failStage(err))

    setHandler(out, new AbstractOutHandler {
      override def onPull(): Unit = {
        this.synchronized {
          cachedItems.headOption match {
            case Some(nextItem)=>
              cachedItems = cachedItems.tail
              nextItemCb.invoke(nextItem)
              return
            case None=>
          }
        }

        //we only get here if there are no items left in the list
        getNextPage().map({
          case Left(_)=>
            errCb.invoke(new RuntimeException("Could not communicate with Vidispine"))
          case Right(None)=>
            completedCb.invoke( () )
          case Right(Some(nextItem))=>
            nextItemCb.invoke(nextItem)
        }).recover({
          case err:Throwable=>
            logger.error("Vidispine request crashed: ", err)
            errCb.invoke(err)
        })
      }
    })
  }