in app/helpers/LightboxStreamComponents/LightboxDynamoSource.scala [28:129]
override def shape: SourceShape[LightboxEntry] = SourceShape.of(out)
private def getQueryRequestBuilder = QueryRequest.builder.tableName(config.get[String]("lightbox.tableName"))
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
protected val client = dynamoClientManager.getClient(config.getOptional[String]("externalData.awsProfile"))
val logger = Logger(getClass)
private var lastEvaluatedKey:Option[mutable.Map[String, AttributeValue]] = None
private var resultCache:Seq[mutable.Map[String,AttributeValue]] = Seq()
private var lastPage = false
private var ctr = 0
/**
* retrieve next page of results from DynamoDB
* @param limit maximum number of items to return
* @param exclusiveStartKey key to start at. If more than `limit` items match, then the value for this is present in the previous
* page's result as `Option(scanResult.getLastEvaluatedKey.asScala)`
* @return a Try with either an AWS QueryResult object or an error
*/
def getNextPage(limit:Int,exclusiveStartKey:Option[mutable.Map[String, AttributeValue]]) = {
logger.info(s"getNextPage: memberOfBulk is $memberOfBulk")
val baseRq = getQueryRequestBuilder
.indexName("memberOfBulkIndex")
.limit(limit)
.keyConditionExpression(s"memberOfBulk = :bulkId")
.expressionAttributeValues(Map(":bulkId"-> AttributeValue.builder().s(memberOfBulk).build()).asJava)
val rqWithStart = exclusiveStartKey match {
case Some(key)=>baseRq.exclusiveStartKey(key.asJava)
case None=>baseRq
}
Try { client.query(rqWithStart.build()) }
}
/**
* helper method to safely return an Option[String] for the given key if it may not be present
* @param sourceData source data as returned by DynamoDB
* @param key key to check
* @return an Option that contains the value of the key, if one is present
*/
def optionalString(sourceData:mutable.Map[String, AttributeValue], key:String):Option[String] = {
sourceData.get(key).flatMap(attributeValue=>Option(attributeValue.s()))
}
/**
* marshals the provided data from DynamoDB into a LightboxEntry
* @param sourceData data as returned from DynamoDB
* @return a Try that either contains the LightboxEntry or an error that occurred while marshalling
*/
def buildLightboxEntry(sourceData: mutable.Map[String,AttributeValue]):Try[LightboxEntry] = Try {
LightboxEntry(
sourceData("userEmail").s(),
sourceData("fileId").s(),
ZonedDateTime.parse(sourceData("addedAt").s()),
RestoreStatus.withName(sourceData("restoreStatus").s()),
optionalString(sourceData,"restoreStarted").map(timeStr=>ZonedDateTime.parse(timeStr)),
optionalString(sourceData,"restoreCompleted").map(timeStr=>ZonedDateTime.parse(timeStr)),
optionalString(sourceData,"availableUntil").map(timeStr=>ZonedDateTime.parse(timeStr)),
optionalString(sourceData,"lastError"),
optionalString(sourceData,"memberOfBulk")
)
}
setHandler(out, new AbstractOutHandler {
override def onPull(): Unit = {
if(resultCache.isEmpty && !lastPage){ //if we are empty, then grab the next page of results
logger.info("cache is empty, getting next page of 100 results")
getNextPage(pageSize, lastEvaluatedKey) match {
case Success(scanResult)=>
logger.info(s"Got scan result with ${scanResult.count()} items")
lastEvaluatedKey = Option(scanResult.lastEvaluatedKey().asScala)
resultCache = scanResult.items().asScala.map(_.asScala).toSeq
ctr+=scanResult.count()
logger.info(s"Scan returned ${scanResult.count()} items, running total is now $ctr items total")
if(scanResult.count()<pageSize) {
logger.info(s"${scanResult.count()} items is less than page size of 100, assuming that all items have been returned")
lastPage = true
}
case Failure(err)=>
logger.error(s"Could not scan Dynamodb table: ", err)
failStage(err)
}
}
if(resultCache.isEmpty ){ //if we are still empty here then no items were added.
logger.info(s"No more results were returned")
complete(out)
} else {
buildLightboxEntry(resultCache.head) match {
case Success(entry) =>
push(out, entry)
resultCache = resultCache.tail
case Failure(err) =>
logger.error(s"Could not marshal lightbox entry data into domain object", err)
failStage(err)
} //match
} //if(resultCache.isEmpty)
} //onPull
}) //setHandler
} //new GraphStageLogic