app/helpers/ArchiveEntryVerifyFlow.scala (47 lines of code) (raw):

package helpers import akka.stream.{Attributes, FlowShape, Inlet, Outlet} import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic} import com.theguardian.multimedia.archivehunter.common.clientManagers.S3ClientManager import com.amazonaws.services.s3.AmazonS3 import com.theguardian.multimedia.archivehunter.common.ArchiveEntry import javax.inject.Inject import play.api.{Configuration, Logger} import software.amazon.awssdk.regions.Region import scala.util.{Failure, Success} /** * this is a Flow component for an Akka stream. * it accepts a stream of [[ArchiveEntry]] instances, each one will be verified for existence (but NOT changes) in S3. * if it is still present, it will be "swallowed" and the stream will move on * if it is NOT still present, the downstream will be passed a new [[ArchiveEntry]] instance with the beenDeleted flag set to TRUE. * @param s3ClientMgr injected [[S3ClientManager]] instance * @param config injected webapp configuration instance */ class ArchiveEntryVerifyFlow @Inject() (s3ClientMgr: S3ClientManager, config:Configuration) extends GraphStage[FlowShape[ArchiveEntry, ArchiveEntry]]{ final val in:Inlet[ArchiveEntry] = Inlet.create("ArchiveEntryVerifyFlow.in") final val out:Outlet[ArchiveEntry] = Outlet.create("ArchiveEntryVerifyFlow.out") import com.theguardian.multimedia.archivehunter.common.cmn_helpers.S3ClientExtensions._ override def shape: FlowShape[ArchiveEntry, ArchiveEntry] = { FlowShape.of(in,out) } lazy val defaultRegion = config.getOptional[String]("externalData.awsRegion").getOrElse("eu-west-1") lazy val awsProfile = config.getOptional[String]("externalData.awsProfile") override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private val logger = Logger(getClass) logger.debug("initialised new instance") setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val elem = grab(in) implicit val s3Client = s3ClientMgr.getS3Client(awsProfile, elem.region.map(Region.of)) s3Client.doesObjectExist(elem.bucket, elem.path, elem.maybeVersion) match { case Success(true)=> logger.debug(s"Object s3://${elem.bucket}/${elem.path} still exists, not passing.") pull(in) case Success(false)=> logger.debug(s"Object s3://${elem.bucket}/${elem.path} does not exist - flagging as missing and passing on") push(out, elem.copy(beenDeleted = true)) case Failure(err)=> logger.error(s"Could not check object s3://${elem.bucket}/${elem.path} - ${err.getMessage}", err) failStage(err) } } }) setHandler(out, new AbstractOutHandler { override def onPull(): Unit = { pull(in) } }) } }