override def shape: SinkShape[ProblemItem] = SinkShape.of()

in app/helpers/ProblemItemReproxySink.scala [19:73]


  override def shape: SinkShape[ProblemItem] = SinkShape.of(in)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
    val completionPromise = Promise[Int]()

    val logic = new GraphStageLogic(shape) {
      private val logger = Logger(getClass)
      var ctr=0

      setHandler(in, new AbstractInHandler {
        override def onPush(): Unit = {
          val elem = grab(in)

          val futureList = elem.verifyResults.map(verifyResult=>{
            if(verifyResult.wantProxy && !verifyResult.haveProxy.getOrElse(false)){
              logger.info(s"Need ${verifyResult.proxyType.toString} proxy for $elem")

              val requestType = verifyResult.proxyType match {
                case ProxyType.VIDEO=> (RequestType.PROXY, Some(ProxyType.VIDEO))
                case ProxyType.AUDIO=> (RequestType.PROXY, Some(ProxyType.AUDIO))
                case ProxyType.THUMBNAIL=> (RequestType.THUMBNAIL, None)
              }
              Some(proxyGenerators.requestProxyJob(requestType._1, elem.fileId, requestType._2))
            } else {
              logger.info(s"Don't need ${verifyResult.proxyType.toString} proxy for $elem")
              None
            }
          })

          val results = Await.result(Future.sequence(futureList.collect({case Some(fut)=>fut})), 10 seconds)
          val total = results.length
          val failures = results.collect({case Failure(err)=>err})
          if(failures.nonEmpty){
            logger.warn(s"${failures.length}/$total proxy requests failed")
            failures.foreach(err=>logger.error("Proxy request failed: ", err))
          }

          ctr+=1

          pull(in)
        }
      })

      override def preStart(): Unit =
      {
        pull(in)
      }

      override def postStop(): Unit = {
        completionPromise.complete(Success(ctr))
      }
    }

    (logic, completionPromise.future)
  }