in common-lib/src/main/scala/com/gu/mediaservice/lib/aws/DynamoDB.scala [124:184]
def setAdd(id: String, key: String, value: String)
(implicit ex: ExecutionContext): Future[JsObject] =
update(
id,
s"ADD $key :value",
new ValueMap().withStringSet(":value", value)
)
def setAdd(id: String, key: String, value: List[String])
(implicit ex: ExecutionContext): Future[JsObject] =
update(
id,
s"ADD $key :value",
new ValueMap().withStringSet(":value", value:_*)
)
def jsonGet(id: String, key: String)
(implicit ex: ExecutionContext): Future[JsValue] =
get(id, key).map(item => asJsObject(item))
def batchGet(ids: List[String], attributeKey: String)
(implicit ex: ExecutionContext, rjs: Reads[T]): Future[Map[String, T]] = {
val keyChunkList = ids
.map(k => Map(IdKey -> new AttributeValue(k)).asJava)
.grouped(100)
Future.traverse(keyChunkList) { keyChunk => {
val keysAndAttributes: KeysAndAttributes = new KeysAndAttributes().withKeys(keyChunk.asJava)
@tailrec
def nextPageOfBatch(request: java.util.Map[String, KeysAndAttributes], acc: List[(String, T)])
(implicit ex: ExecutionContext, rjs: Reads[T]): List[(String, T)] = {
if (request.isEmpty) acc
else {
logger.info(s"Fetching records for $request")
val response = client.batchGetItem(request)
val responses = response.getResponses
logger.info(s"Got responses of $responses")
val results = responses.get(tableName).asScala.toList
.flatMap(att => {
val attributes: util.Map[String, AnyRef] = ItemUtils.toSimpleMapValue(att)
logger.info(s"Obtained attributes of $attributes from response $att")
val json = asJsObject(Item.fromMap(attributes))
val maybeT = (json \ attributeKey).asOpt[T]
logger.info(s"Obtained a T of $maybeT from json $json")
maybeT.map(
attributes.get(IdKey).toString -> _
)
})
logger.info(s"Got $results for request")
nextPageOfBatch(response.getUnprocessedKeys, acc ::: results)
}
}
Future {
nextPageOfBatch(Map(tableName -> keysAndAttributes).asJava, Nil).toMap
}
}}
.map(chunkIterator => chunkIterator.fold(Map.empty)((acc, result) => acc ++ result))
}