in common/src/main/scala/db/SqlRegistrationRepository.scala [80:113]
override def findTokens(topics: NonEmptyList[String], shardRange: Option[Range]): Stream[F, HarvestedToken] = {
val queryStatement = (sql"""
SELECT token, platform, buildTier
FROM registrations
"""
++
Fragments.whereAndOpt(
Some(Fragments.in(fr"topic", topics)),
shardRange.map(s => Fragments.and(fr"shard >= ${s.min}", fr"shard <= ${s.max}"))
)
++ fr"GROUP BY token, platform, buildTier"
)
logger.info("About to run query: " + queryStatement);
val result = queryStatement
.query[(String, String, Option[String])]
.stream
.transact(xa)
logger.info("Result: " + result.zipWithIndex)
result.map{ case (token, platformString, buildTierString) => {
val maybePlatform = Platform.fromString(platformString)
if(maybePlatform.isEmpty) {
logger.error(s"Unknown platform in db $platformString")
}
val maybeBuildTier: Option[BuildTier] = buildTierString.flatMap(BuildTier.fromString)
(token, maybePlatform, maybeBuildTier)
}}
.collect {
case (token, Some(platform), buildTier) => HarvestedToken(token, platform, buildTier)
}
}