in app/helpers/S3ToArchiveEntryFlow.scala [32:128]
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
//over-riding the element name to provide the region is a bit hacky but it works.
val region = inheritedAttributes.nameOrDefault(config.getOptional[String]("externalData.awsRegion").getOrElse("eu-west-1"))
implicit val s3Client:S3Client = s3ClientMgr.getS3Client(config.getOptional[String]("externalData.awsProfile"),Some(Region.of(region)))
private implicit val indexer = new Indexer(config.get[String]("externalData.indexName"))
private implicit val esClient = esClientManager.getClient()
private val logger=Logger(getClass)
logger.debug("initialised new instance")
/**
* returns an updated entry if there are significant differences
* @param existingEntry
* @param newEntry
* @return
*/
def updateIfNecessary(existingEntry:ArchiveEntry, newEntry:ArchiveEntry):Option[ArchiveEntry] = {
val firstUpdate = if(existingEntry.last_modified.toLocalDateTime!=newEntry.last_modified.toLocalDateTime){
logger.info(s"last_modified time updated on ${existingEntry.path} from ${existingEntry.last_modified.toLocalDateTime} to ${newEntry.last_modified.toLocalDateTime}")
existingEntry.copy(last_modified = newEntry.last_modified)
} else {
existingEntry
}
val secondUpdate = if(existingEntry.etag!=newEntry.etag){
logger.info(s"etag updated on ${existingEntry.location}")
firstUpdate.copy(etag = newEntry.etag)
} else {
firstUpdate
}
val thirdUpdate = if(existingEntry.storageClass!=newEntry.storageClass){
logger.info(s"storage class updated on ${existingEntry.location}")
secondUpdate.copy(storageClass = newEntry.storageClass)
} else {
secondUpdate
}
if(thirdUpdate==existingEntry){
logger.info(s"No differences on ${existingEntry.location}")
None
} else {
logger.info(s"Updates detected on ${existingEntry.location}")
Some(thirdUpdate)
}
}
setHandler(in, new AbstractInHandler {
override def onPush(): Unit = {
val elem = grab(in)
logger.debug(s"got element $elem")
try {
//we need to do a metadata lookup to get the MIME type anyway, so we may as well just call out here.
//it appears that you can't push() to a port from in a Future thread, so doing it the crappy way and blocking here.
val mappedElem = ArchiveEntry.fromS3Sync(elem.bucketName, elem.key, None, region)
logger.debug(s"Mapped $elem to $mappedElem")
val maybeExistingEntry = ArchiveEntry.fromIndexFull(elem.bucketName, elem.key)
val toUpdateFuture = maybeExistingEntry.map({
case Right(existingEntry)=>
logger.info(s"Found existing entry for s3://${elem.bucketName}/${elem.key} at ${existingEntry.id}")
updateIfNecessary(existingEntry, mappedElem)
case Left(ItemNotFound(itemId))=>
logger.info(s"No existing item found for $itemId")
Some(mappedElem)
case Left(err)=>
logger.error(s"Could not check existing archive entry: $err")
None
})
Await.result(toUpdateFuture, 30 seconds) match {
case Some(elemToUpdate)=>
push(out, elemToUpdate)
case None=>
logger.info(s"Nothing to update on ${mappedElem.location
}, grabbing next item")
pull(in)
}
} catch {
case ex:Throwable=>
logger.error(s"Could not create ArchiveEntry for s3://${elem.bucketName}/${elem.key}: ", ex)
failStage(ex)
}
}
})
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
logger.debug("pull from downstream")
pull(in)
}
})
}