app/mxscopy/MXSConnectionBuilderImpl.scala (99 lines of code) (raw):

package mxscopy import akka.actor.ActorSystem import com.om.mxs.client.japi.cred.Credentials import com.om.mxs.client.japi.{MatrixStore, MatrixStoreConnection, Vault} import org.slf4j.LoggerFactory import java.time.Instant import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} import scala.concurrent.duration._ /** * describes the interface of MXSConnectionBuilder, which safely creates and disposes MatrixStore connections */ trait MXSConnectionBuilder { def getConnection():Try[MatrixStore] def withVaultFuture[T](vaultId:String)(cb:Vault=>Future[Either[String, T]])(implicit ec:ExecutionContext):Future[Either[String,T]] } /** * Real implementation of MXSConnectionBuilder. This will call out to the appliance to retrieve vault references * @param hosts Appliance hostnames/IP addresses as an array of strings * @param clusterId Cluster id. * @param accessKeyId Access key id. for the service account to use to connect * @param accessKeySecret Access key secret for the service account to use to connect * @param maxIdleSeconds Timeout, in seconds, after which an idle connection will be cloased * @param loanLimitWarning Timeout, in seconds, after warnings will be emitted if a connection is still "in-use" */ class MXSConnectionBuilderImpl(hosts: Array[String], clusterId:String, accessKeyId:String, accessKeySecret:String, maxIdleSeconds:Int=300, loanLimitWarning:Int=14400)(implicit actorSystem: ActorSystem) extends MXSConnectionBuilder { //private vars are synchronised to object instance on access - that's why they need to be private and final private final var cachedConnection:Option[MatrixStore] = None private final var connectionLastRetrieved:Instant = Instant.now() private final var isInUse = false private implicit val ec:ExecutionContext = actorSystem.dispatcher private val logger = LoggerFactory.getLogger(getClass) def build() = Try { logger.debug(s"Building new MXS connection to $hosts") val credentials = Credentials.newAccessKeyCredentials(accessKeyId, accessKeySecret) val conn = MatrixStoreConnection.builder().withHosts(hosts).withClusterId(clusterId).build() MatrixStore.builder() .withConnection(conn) .withCredentials(credentials) .build() } def getConnection():Try[MatrixStore] = this.synchronized { cachedConnection match { case Some(connection)=> logger.debug("Using cached MXS datastore connection") connectionLastRetrieved = Instant.now() Success(connection) case None=> logger.debug("Building new MXS datastore connection") connectionLastRetrieved = Instant.now() build().map(mxs=>{ cachedConnection = Some(mxs) mxs }) } } actorSystem.getScheduler.scheduleAtFixedRate(30.seconds, 30.seconds)(new Runnable { override def run(): Unit = { this.synchronized { cachedConnection match { case None=> logger.debug("No current MXS connection") case Some(mxs)=> val idleTime = Instant.now().getEpochSecond - connectionLastRetrieved.getEpochSecond logger.debug(s"Idle time of cached connection is $idleTime seconds, in-use flag $isInUse") if(idleTime>=maxIdleSeconds && !isInUse) { logger.info(s"Terminating MXS connection as it has been idle for $idleTime seconds") mxs.dispose() cachedConnection = None } else if(idleTime>=loanLimitWarning && isInUse) { logger.warn(s"MXS connection is still marked as being-in use after $idleTime seconds, this is most likely a bug") } } } } }) /** * Initiates a connection to the configuration indicated by the builder, opens the given vault then runs the callback. * The callback is expected to return a Future of some type T. * Whether it succeeds or fails, the vault connection is then disposed. * Use this in preference to the static method if you have no pre-existing connection * @param vaultId Vault id. to open * @param cb Callback function which takes the Vault instance as a parameter and returns a Future of any type * @param ec Implicitly defined execution context * @tparam T The type returned by the callback's Future * @return Either the result of the callback, or a failed Try indicating some error in establishing the connection */ def withVaultFuture[T](vaultId:String)(cb: Vault => Future[Either[String, T]])(implicit ec:ExecutionContext) = { Future.fromTry(getConnection()).flatMap(mxs=>{ isInUse = true MXSConnectionBuilderImpl .withVaultFuture(mxs, vaultId)(cb) .andThen(_=>{ isInUse = false }) }) } /** * Exactly the same as [[withVaultFuture]] but takes in multiple vault IDs and opens a connection to each of them. * The sequence of Vault instances passed to the callback should be in the same order as the vault IDs passed to the * function. * @param vaultIds Sequence of strings representing the vault IDs to open. A failure is returned if any of these fail. * @param cb Callback function, which needs to take a sequence of Vault objects representing the open vaults and return a Future of some type * @param ec Implicitly provided execution context * @tparam T Data type returned in the Future of the callback * @return The result of the callback, or a failure if we were not able to establish the connection */ def withVaultsFuture[T](vaultIds:Seq[String])(cb: Seq[Vault]=>Future[T])(implicit ec:ExecutionContext) = { Future.fromTry(getConnection()).flatMap(mxs=>{ isInUse = true val result = Future .sequence(vaultIds.map(vid=>Future.fromTry(Try{mxs.openVault(vid)}))) .flatMap(cb) isInUse = false result }) } } object MXSConnectionBuilderImpl { private val logger = LoggerFactory.getLogger(getClass) @deprecated("From external code, you should be going via an instance of MXSConnectionBuilderImpl not the static object") def withVaultFuture[T](mxs:MatrixStore, vaultId: String)(cb: (Vault) => Future[Either[String, T]])(implicit ec:ExecutionContext) = { Try { mxs.openVault(vaultId) } match { case Success(vault) => cb(vault).andThen(_=>vault.dispose()) case Failure(err) => logger.error(s"Could not establish vault connection: ${err.getMessage}", err) Future(Left(err.toString)) } } }