app/actors/ObjectCache.scala (104 lines of code) (raw):

package actors import java.util.UUID import java.util.concurrent.TimeUnit import actors.ObjectCache.{CacheEntry, ExpiryTick, Lookup, ObjectFound, ObjectLookupFailed, ObjectNotFound, UpdateCache} import akka.actor.{Actor, ActorRef, ActorSystem} import akka.stream.Materializer import com.om.mxs.client.japi.{MatrixStore, SearchTerm, UserInfo, Vault} import helpers.{OMLocator, UserInfoBuilder, UserInfoCache} import javax.inject.{Inject, Singleton} import models.ObjectMatrixEntry import org.slf4j.LoggerFactory import play.api.Configuration import scala.collection.JavaConverters._ import scala.concurrent.Future import scala.util.{Failure, Success, Try} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ object ObjectCache { trait OCMsg /* incoming messages */ case class Lookup(locator:OMLocator) extends OCMsg /* private incoming messages */ case class UpdateCache(locator: OMLocator, entry:ObjectMatrixEntry) case object ExpiryTick /*outgoing messages*/ case class ObjectFound(locator:OMLocator, entry:ObjectMatrixEntry) extends OCMsg case class ObjectNotFound(locator: OMLocator) extends OCMsg case class ObjectLookupFailed(locator:OMLocator, msg:String) extends OCMsg /*private*/ case class CacheEntry(entry:ObjectMatrixEntry, lastUsed:Long) { def updated():CacheEntry = this.copy(lastUsed=java.time.Instant.now().getEpochSecond) } } /** * Caching lookup actor for ObjectMatrix files. * Send an Assk with the Lookup message passing an [[OMLocator]] instance and you will either get [[ObjectFound]] with the OID * or [[ObjectNotFound]] or [[ObjectLookupFailed]] in reply * Initiate this via guice dependency injection. * @param userInfoCache * @param config * @param system */ @Singleton class ObjectCache @Inject() (userInfoCache:UserInfoCache, config:Configuration)(implicit mat:Materializer, system:ActorSystem) extends Actor { private val logger = LoggerFactory.getLogger(getClass) //map of (vaultid,path)->oid protected var content:Map[(UUID,String),CacheEntry] = Map() protected val ownRef:ActorRef = self private val expiryTime:Duration = config.getOptional[String]("vaults.lookup-expiry-time-seconds").map(Duration.apply).getOrElse(Duration(1, TimeUnit.MINUTES)) protected def setupTimer() = system.scheduler.schedule(1.minute, 1.minute,ownRef,ExpiryTick) setupTimer() /** * locate files for the given filename, as stored in the metadata. This assumes that one or at most two records will * be returned and should therefore be more efficient than using the streaming interface. If many records are expected, * this will be inefficient and you should use the streaming interface. * this will return a Future to avoid blocking any other lookup requests that would hit the cache * @param fileName file name to search for * @return a Future, containing either a sequence of zero or more results as String oids or an error */ def findByFilename(userInfoBuilder:UserInfoBuilder, fileName:String):Future[Option[ObjectMatrixEntry]] = userInfoBuilder.getUserInfo.flatMap(userInfo=>Try { MatrixStore.openVault(userInfo) }) match { case Success(vault)=> implicit val vaultImpl = vault Future { logger.debug(s"Lookup $fileName on ${vault.getId}") val searchTerm = SearchTerm.createSimpleTerm("MXFS_FILENAME", fileName) //FIXME: check the metadata field namee val iterator = vault.searchObjectsIterator(searchTerm, 1).asScala var finalSeq: Seq[String] = Seq() while (iterator.hasNext) { //the iterator contains the OID finalSeq ++= Seq(iterator.next()) } if(finalSeq.length>1) logger.warn(s"Found ${finalSeq.length} object matching $fileName, only using the first") finalSeq.headOption match { case Some(oid)=>ObjectMatrixEntry(oid).getMetadata.map(entry=>Some(entry)) case None=>Future(None) } }.flatten.recover({ case err:Throwable=> logger.error(s"Failed to perform lookup on OM appliance: ", err) vault.dispose() throw err }) case Failure(err)=> logger.error(s"Could not open vault: ", err) Future.failed(err) } override def receive: Receive = { case UpdateCache(locator, oid)=> logger.debug(s"Updating cache with $oid for $locator") content = content ++ Map((locator.vaultId, locator.filePath)->CacheEntry(oid, java.time.Instant.now().getEpochSecond)) logger.debug(content.toString()) case ExpiryTick=> //purge out stale cache values logger.debug("expiry tick") val expiryThreshold = java.time.Instant.now().getEpochSecond - expiryTime.toSeconds content = content.filter(kv=>kv._2.lastUsed<expiryThreshold) case Lookup(locator)=> logger.debug(locator.toString) content.get((locator.vaultId, locator.filePath)) match { case Some(entry)=> logger.info(s"Cache hit for ${locator.filePath}") content = content ++ Map((locator.vaultId, locator.filePath)->entry.updated()) sender ! ObjectFound(locator, entry.entry) case None=> logger.info(s"Cache miss for ${locator.filePath}") val originalSender = sender() userInfoCache.infoForAddress(locator.host, locator.vaultId.toString) match { case None=> logger.info(s"No login information for vault ${locator.vaultId} on ${locator.host}") originalSender ! ObjectNotFound(locator) case Some(userInfo)=> findByFilename(userInfo, locator.filePath).onComplete({ case Failure(err)=> logger.error(s"Could not look up $locator on OM appliance: ", err) originalSender ! ObjectLookupFailed(locator, "Appliance lookup failed") case Success(maybeEntry)=> if(maybeEntry.isEmpty){ originalSender ! ObjectNotFound(locator) } else { ownRef ! UpdateCache(locator, maybeEntry.get) originalSender ! ObjectFound(locator, maybeEntry.get) } }) } } } }