override def shape: SinkShape[LightboxEntry] = SinkShape.of()

in app/helpers/LightboxStreamComponents/BulkRestoreStatsSink.scala [28:84]


  override def shape: SinkShape[LightboxEntry] = SinkShape.of(in)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[BulkRestoreStats]) = {
    val logger = Logger(getClass)

    val completionPromise = Promise[BulkRestoreStats]()

    var currentStats:BulkRestoreStats = BulkRestoreStats.empty
    val logic = new GraphStageLogic(shape) {
      setHandler(in, new AbstractInHandler {
        override def onPush(): Unit = {
          val elem = grab(in)

          val result = Try { Await.result((glacierRestoreActor ? CheckRestoreStatus(elem)).mapTo[GRMsg], 60 seconds) }

          result match {
            case Failure(err)=>
              logger.error(s"Could not check archive status on ${elem.fileId}", err)
              failStage(err)
            case Success(ItemLost(_))=>
              logger.warn(s"${elem.fileId} has been lost!")
              currentStats = currentStats.copy(lost = currentStats.lost+1)
            case Success(NotInArchive(_))=>
              logger.info(s"${elem.fileId} is not in Glacier")
              currentStats = currentStats.copy(unneeded = currentStats.unneeded+1)
            case Success(RestoreNotRequested(_))=>
              logger.info(s"${elem.fileId} was not requested")
              currentStats = currentStats.copy(notRequested = currentStats.notRequested+1)
            case Success(RestoreInProgress(_))=>
              logger.info(s"${elem.fileId} is currently restoring")
              currentStats = currentStats.copy(inProgress = currentStats.inProgress+1)
            case Success(RestoreCompleted(_, _))=>
              logger.info(s"${elem.fileId} is available")
              currentStats = currentStats.copy(available = currentStats.available+1)
            case Success(RestoreFailure(err))=>
              logger.error(s"Could not check archive status on ${elem.fileId}", err)
              failStage(err)
            case Success(other)=>
              logger.error(s"Got unexpected message $other")
              failStage(new RuntimeException(s"Got unexpected message $other"))
          }

          pull(in)
        }
      })

      override def preStart(): Unit = {
        pull(in)
      }

      override def postStop(): Unit = {
        completionPromise.complete(Success(currentStats))
      }
    }

    (logic, completionPromise.future)
  }