def writeSubscriptionIdsToCohortTable()

in lambda/src/main/scala/pricemigrationengine/handlers/SubscriptionIdUploadHandler.scala [69:89]


  def writeSubscriptionIdsToCohortTable(inputStream: InputStream): ZIO[CohortTable with Logging, Failure, Long] = {
    ZStream
      .fromJavaIterator(
        CSVParser.parse(new InputStreamReader(inputStream, "UTF-8"), csvFormat).iterator()
      )
      .mapBoth(
        ex => SubscriptionIdUploadFailure(s"Failed to read subscription csv stream: $ex"),
        csvRecord => csvRecord.get(0)
      )
      .mapZIO { subscriptionId =>
        ({
          CohortTable
            .create(CohortItem(subscriptionId, ReadyForEstimation))
        } <* Logging.info(s"Imported subscription $subscriptionId"))
          .catchSome { case e: CohortItemAlreadyPresentFailure =>
            Logging.info(s"Ignored $subscriptionId as already in table (DB error: ${e.reason})").unit
          }
          .tapError(e => Logging.error(s"Subscription $subscriptionId failed: $e"))
      }
      .runCount
  }