override def findTokens()

in common/src/main/scala/db/SqlRegistrationRepository.scala [80:113]


  override def findTokens(topics: NonEmptyList[String], shardRange: Option[Range]): Stream[F, HarvestedToken] = {
    val queryStatement = (sql"""
        SELECT token, platform, buildTier
        FROM registrations
    """
      ++
      Fragments.whereAndOpt(
        Some(Fragments.in(fr"topic", topics)),
        shardRange.map(s => Fragments.and(fr"shard >= ${s.min}", fr"shard <= ${s.max}"))
      )
      ++ fr"GROUP BY token, platform, buildTier"
      )

    logger.info("About to run query: " + queryStatement);

    val result = queryStatement
      .query[(String, String, Option[String])]
      .stream
      .transact(xa)

    logger.info("Result: " + result.zipWithIndex)

    result.map{ case (token, platformString, buildTierString) => {
      val maybePlatform = Platform.fromString(platformString)
      if(maybePlatform.isEmpty) {
        logger.error(s"Unknown platform in db $platformString")
      }
      val maybeBuildTier: Option[BuildTier] = buildTierString.flatMap(BuildTier.fromString)
      (token, maybePlatform, maybeBuildTier)
    }}
    .collect {
      case (token, Some(platform), buildTier) => HarvestedToken(token, platform, buildTier)
    }
  }