in app/helpers/DDBSink.scala [24:86]
override def shape: SinkShape[ProxyLocation] = SinkShape.of(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private val logger = Logger(getClass)
var recordBuffer:Seq[ProxyLocation] = Seq()
val flushFrequency = config.getOptional[Int]("dynamodb.flushFrequency").getOrElse(100)
val scanamoAlpakka = ScanamoAlpakka(clientMgr.getNewAsyncDynamoClient(config.getOptional[String]("externalData.awsProfile")))
val tableName = config.getOptional[String]("proxies.tableName").getOrElse("archiveHunterProxies")
val table = Table[ProxyLocation](tableName)
/**
* if the provided Set of items contains duplicate database primary keys (from the perspective of Dynamo) then it fails.
* this function removes any duplicates of these keys so that we know that the update will succeed
* @return Iterable of unique ProxyLocation objects
*/
def dedupeRecordBuffer:Iterable[ProxyLocation] = {
val recordBufferMap = recordBuffer.map(loc=>(loc.fileId,loc.proxyType)->loc).toMap
recordBufferMap.values
}
setHandler(in, new AbstractInHandler {
override def onPush(): Unit ={
val elem=grab(in)
recordBuffer++=Seq(elem)
if(recordBuffer.length>=flushFrequency){
val dduped = dedupeRecordBuffer.toSeq
logger.debug(s"Flushing ${dduped.length} records...")
Await.result(
scanamoAlpakka
.exec(table.putAll(dduped.toSet))
.runWith(Sink.head),
1 minute)
logger.debug(s"Flush completed")
recordBuffer = Seq()
} else {
logger.debug("Buffering record")
}
pull(in)
}
})
override def preStart() = {
pull(in) //start off the chain...
}
override def postStop(): Unit = {
val dduped = dedupeRecordBuffer.toSeq
logger.info(s"Stream ended. Flushing ${dduped.length} remaining records...")
Await.result(
scanamoAlpakka
.exec(table.putAll(dduped.toSet))
.runWith(Sink.head),
1 minute)
logger.debug(s"Flush completed")
recordBuffer = Seq()
}
}