in app/helpers/LightboxStreamComponents/BulkRestoreStatsSink.scala [28:84]
override def shape: SinkShape[LightboxEntry] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[BulkRestoreStats]) = {
val logger = Logger(getClass)
val completionPromise = Promise[BulkRestoreStats]()
var currentStats:BulkRestoreStats = BulkRestoreStats.empty
val logic = new GraphStageLogic(shape) {
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
val result = Try { Await.result((glacierRestoreActor ? CheckRestoreStatus(elem)).mapTo[GRMsg], 60 seconds) }
result match {
case Failure(err)=>
logger.error(s"Could not check archive status on ${elem.fileId}", err)
failStage(err)
case Success(ItemLost(_))=>
logger.warn(s"${elem.fileId} has been lost!")
currentStats = currentStats.copy(lost = currentStats.lost+1)
case Success(NotInArchive(_))=>
logger.info(s"${elem.fileId} is not in Glacier")
currentStats = currentStats.copy(unneeded = currentStats.unneeded+1)
case Success(RestoreNotRequested(_))=>
logger.info(s"${elem.fileId} was not requested")
currentStats = currentStats.copy(notRequested = currentStats.notRequested+1)
case Success(RestoreInProgress(_))=>
logger.info(s"${elem.fileId} is currently restoring")
currentStats = currentStats.copy(inProgress = currentStats.inProgress+1)
case Success(RestoreCompleted(_, _))=>
logger.info(s"${elem.fileId} is available")
currentStats = currentStats.copy(available = currentStats.available+1)
case Success(RestoreFailure(err))=>
logger.error(s"Could not check archive status on ${elem.fileId}", err)
failStage(err)
case Success(other)=>
logger.error(s"Got unexpected message $other")
failStage(new RuntimeException(s"Got unexpected message $other"))
}
pull(in)
}
})
override def preStart(): Unit = {
pull(in)
}
override def postStop(): Unit = {
completionPromise.complete(Success(currentStats))
}
}
(logic, completionPromise.future)
}