app/helpers/S3ToArchiveEntryFlow.scala (101 lines of code) (raw):

package helpers import akka.stream.alpakka.s3.ListBucketResultContents import java.time.{ZoneId, ZonedDateTime} import akka.stream.{Attributes, FlowShape, Inlet, Outlet} import akka.stream.alpakka.s3.scaladsl._ import akka.stream.scaladsl._ import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic} import com.theguardian.multimedia.archivehunter.common.clientManagers.{ESClientManager, S3ClientManager} import com.amazonaws.services.s3.{AmazonS3, AmazonS3Client} import com.theguardian.multimedia.archivehunter.common.{ArchiveEntry, Indexer} import com.theguardian.multimedia.archivehunter.common.cmn_models.ItemNotFound import javax.inject.Inject import play.api.{Configuration, Logger} import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.s3.S3Client import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Future} class S3ToArchiveEntryFlow @Inject() (s3ClientMgr: S3ClientManager, config:Configuration, esClientManager:ESClientManager) extends GraphStage[FlowShape[ListBucketResultContents, ArchiveEntry]] { final val in:Inlet[ListBucketResultContents] = Inlet.create("S3ToArchiveEntry.in") final val out:Outlet[ArchiveEntry] = Outlet.create("S3ToArchiveEntry.out") override def shape: FlowShape[ListBucketResultContents, ArchiveEntry] = { FlowShape.of(in,out) } override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { //over-riding the element name to provide the region is a bit hacky but it works. val region = inheritedAttributes.nameOrDefault(config.getOptional[String]("externalData.awsRegion").getOrElse("eu-west-1")) implicit val s3Client:S3Client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"),Some(Region.of(region))) private implicit val indexer = new Indexer(config.get[String]("externalData.indexName")) private implicit val esClient = esClientManager.getClient() private val logger=Logger(getClass) logger.debug("initialised new instance") /** * returns an updated entry if there are significant differences * @param existingEntry * @param newEntry * @return */ def updateIfNecessary(existingEntry:ArchiveEntry, newEntry:ArchiveEntry):Option[ArchiveEntry] = { val firstUpdate = if(existingEntry.last_modified.toLocalDateTime!=newEntry.last_modified.toLocalDateTime){ logger.info(s"last_modified time updated on ${existingEntry.path} from ${existingEntry.last_modified.toLocalDateTime} to ${newEntry.last_modified.toLocalDateTime}") existingEntry.copy(last_modified = newEntry.last_modified) } else { existingEntry } val secondUpdate = if(existingEntry.etag!=newEntry.etag){ logger.info(s"etag updated on ${existingEntry.location}") firstUpdate.copy(etag = newEntry.etag) } else { firstUpdate } val thirdUpdate = if(existingEntry.storageClass!=newEntry.storageClass){ logger.info(s"storage class updated on ${existingEntry.location}") secondUpdate.copy(storageClass = newEntry.storageClass) } else { secondUpdate } if(thirdUpdate==existingEntry){ logger.info(s"No differences on ${existingEntry.location}") None } else { logger.info(s"Updates detected on ${existingEntry.location}") Some(thirdUpdate) } } setHandler(in, new AbstractInHandler { override def onPush(): Unit = { val elem = grab(in) logger.debug(s"got element $elem") try { //we need to do a metadata lookup to get the MIME type anyway, so we may as well just call out here. //it appears that you can't push() to a port from in a Future thread, so doing it the crappy way and blocking here. val mappedElem = ArchiveEntry.fromS3Sync(elem.bucketName, elem.key, None, region) logger.debug(s"Mapped $elem to $mappedElem") val maybeExistingEntry = ArchiveEntry.fromIndexFull(elem.bucketName, elem.key) val toUpdateFuture = maybeExistingEntry.map({ case Right(existingEntry)=> logger.info(s"Found existing entry for s3://${elem.bucketName}/${elem.key} at ${existingEntry.id}") updateIfNecessary(existingEntry, mappedElem) case Left(ItemNotFound(itemId))=> logger.info(s"No existing item found for $itemId") Some(mappedElem) case Left(err)=> logger.error(s"Could not check existing archive entry: $err") None }) Await.result(toUpdateFuture, 30 seconds) match { case Some(elemToUpdate)=> push(out, elemToUpdate) case None=> logger.info(s"Nothing to update on ${mappedElem.location }, grabbing next item") pull(in) } } catch { case ex:Throwable=> logger.error(s"Could not create ArchiveEntry for s3://${elem.bucketName}/${elem.key}: ", ex) failStage(ex) } } }) setHandler(out, new AbstractOutHandler { override def onPull(): Unit = { logger.debug("pull from downstream") pull(in) } }) } }