in common/src/main/scala/tracking/NotificationReportRepository.scala [34:81]
override def getByTypeWithDateRange(notificationType: NotificationType, from: DateTime, to: DateTime): Future[RepositoryResult[List[NotificationReport]]] = {
if (Days.daysBetween(from, to).getDays > 31) {
return Future.successful(Left(RepositoryError("Date range too big to query")))
}
def maybeStartKey(result: QueryResult): Option[util.Map[String, AttributeValue]] = Option(result.getLastEvaluatedKey).flatMap(lastKey => if (lastKey.isEmpty) None else Some(lastKey))
def reportsFromResult(result: QueryResult): RepositoryResult[List[NotificationReport]] = {
val results = result.getItems.asScala.toList.map { item =>
fromAttributeMap[NotificationReport](item.asScala.toMap)
}
val error = results.collectFirst {
case JsError(errors) => Left(RepositoryError(s"Unable to parse notification report $errors"))
}
val reports = results.collect{
case JsSuccess(report, _) => report
}
error.getOrElse(Right(reports))
}
def buildDynamoQuery(startKey: Option[util.Map[String, AttributeValue]]): QueryRequest = new QueryRequest(tableName)
.withIndexName(SentTimeIndex)
.withKeyConditions(Map(
TypeField -> keyEquals(notificationType.value),
SentTimeField -> keyBetween(from.toString, to.toString)
).asJava)
.withExclusiveStartKey(startKey.orNull)
def fetch(
startKey: Option[util.Map[String, AttributeValue]] = None,
lastListResult: RepositoryResult[List[NotificationReport]] = Right(Nil)
): Future[RepositoryResult[List[NotificationReport]]] = {
client.query(buildDynamoQuery(startKey)) flatMap { result =>
val reports = for {
lastList <- lastListResult
fetched <- reportsFromResult(result)
} yield lastList ++ fetched
maybeStartKey(result) match {
case None => Future.successful(reports)
case Some(newStartKey) => fetch(Some(newStartKey), reports)
}
}
}
fetch()
}