app/models/OAuthTokenEntryDAO.scala (45 lines of code) (raw):

package models import akka.actor.ActorSystem import akka.stream.Materializer import akka.stream.scaladsl.Sink import org.scanamo._ import com.theguardian.multimedia.archivehunter.common.clientManagers.DynamoClientManager import play.api.Configuration import org.scanamo.syntax._ import org.scanamo.generic.auto._ import org.slf4j.LoggerFactory import java.time.{Duration, ZonedDateTime} import javax.inject.{Inject, Singleton} import scala.concurrent.{ExecutionContext, Future} @Singleton class OAuthTokenEntryDAO @Inject() (config:Configuration, dynamoClientManager: DynamoClientManager)(implicit actorSystem:ActorSystem, mat:Materializer) { private val logger = LoggerFactory.getLogger(getClass) private lazy val scanamoAlpakka = ScanamoAlpakka( dynamoClientManager.getNewAsyncDynamoClient(config.getOptional[String]("externalData.awsProfile")) ) private val tableName = config.getOptional[String]("oAuth.oAuthTokensTable").getOrElse("oauth-tokens-table") private val table = Table[OAuthTokenEntry](tableName) implicit val ec:ExecutionContext = actorSystem.dispatcher type TokenReturnValue = List[Either[DynamoReadError, OAuthTokenEntry]] private val MakeOAuthTokenEntrySink = Sink.fold[TokenReturnValue, TokenReturnValue](List())(_ ++ _) def lookupToken(forUser:String): Future[Option[OAuthTokenEntry]] = { scanamoAlpakka.exec(table.query("userEmail" === forUser)).runWith(MakeOAuthTokenEntrySink) }.flatMap(results=>{ val failures = results.collect({case Left(err)=>err}) if(failures.nonEmpty) { logger.error(s"${failures.length} operations failed when getting oauth refresh token for $forUser:") failures.foreach(f=>logger.error(s"\t${f.toString}")) Future.failed(new RuntimeException(s"${failures.length} operations failed, see logs for details")) } else { Future(results.collect({case Right(entry)=>entry})) } }).map(_.headOption) def removeUsedToken(token:OAuthTokenEntry) = scanamoAlpakka .exec(table.delete("userEmail"===token.userEmail and "issued"===token.issued)) .runWith(Sink.head) /** * Makes a new token object from the provided data and saves it to the database. * @param forUser user that owns the token * @param issuedAt timestamp of issue * @param value token value * @return a Future with the OAuthTokenEntry object. The future fails on error. */ def saveToken(forUser:String, issuedAt:ZonedDateTime, value:String) = { val newEntry = OAuthTokenEntry(forUser, issuedAt.toInstant.getEpochSecond, value) scanamoAlpakka.exec(table.put(newEntry)).runWith(Sink.head).map(_=>newEntry) } }