app/helpers/DDBSink.scala (67 lines of code) (raw):

package helpers import akka.actor.ActorSystem import akka.stream.scaladsl.Sink import akka.stream.{ActorMaterializer, Attributes, Inlet, Materializer, SinkShape} import akka.stream.stage.{AbstractInHandler, AbstractOutHandler, GraphStage, GraphStageLogic} import com.theguardian.multimedia.archivehunter.common.clientManagers.DynamoClientManager import org.scanamo.{ScanamoAlpakka, Table} import org.scanamo.syntax._ import org.scanamo.generic.auto._ import com.theguardian.multimedia.archivehunter.common.{ProxyLocation, ProxyLocationEncoder} import javax.inject.Inject import play.api.{Configuration, Logger} import scala.concurrent.Await import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ final class DDBSink @Inject()(clientMgr: DynamoClientManager,config:Configuration)(implicit system:ActorSystem, mat:Materializer) extends GraphStage[SinkShape[ProxyLocation]] with ProxyLocationEncoder { private val in:Inlet[ProxyLocation] = Inlet.create("DDBSink.in") override def shape: SinkShape[ProxyLocation] = SinkShape.of(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private val logger = Logger(getClass) var recordBuffer:Seq[ProxyLocation] = Seq() val flushFrequency = config.getOptional[Int]("dynamodb.flushFrequency").getOrElse(100) val scanamoAlpakka = ScanamoAlpakka(clientMgr.getNewAsyncDynamoClient(config.getOptional[String]("externalData.awsProfile"))) val tableName = config.getOptional[String]("proxies.tableName").getOrElse("archiveHunterProxies") val table = Table[ProxyLocation](tableName) /** * if the provided Set of items contains duplicate database primary keys (from the perspective of Dynamo) then it fails. * this function removes any duplicates of these keys so that we know that the update will succeed * @return Iterable of unique ProxyLocation objects */ def dedupeRecordBuffer:Iterable[ProxyLocation] = { val recordBufferMap = recordBuffer.map(loc=>(loc.fileId,loc.proxyType)->loc).toMap recordBufferMap.values } setHandler(in, new AbstractInHandler { override def onPush(): Unit ={ val elem=grab(in) recordBuffer++=Seq(elem) if(recordBuffer.length>=flushFrequency){ val dduped = dedupeRecordBuffer.toSeq logger.debug(s"Flushing ${dduped.length} records...") Await.result( scanamoAlpakka .exec(table.putAll(dduped.toSet)) .runWith(Sink.head), 1 minute) logger.debug(s"Flush completed") recordBuffer = Seq() } else { logger.debug("Buffering record") } pull(in) } }) override def preStart() = { pull(in) //start off the chain... } override def postStop(): Unit = { val dduped = dedupeRecordBuffer.toSeq logger.info(s"Stream ended. Flushing ${dduped.length} remaining records...") Await.result( scanamoAlpakka .exec(table.putAll(dduped.toSet)) .runWith(Sink.head), 1 minute) logger.debug(s"Flush completed") recordBuffer = Seq() } } }