in lambda/src/main/scala/pricemigrationengine/handlers/SubscriptionIdUploadHandler.scala [69:89]
def writeSubscriptionIdsToCohortTable(inputStream: InputStream): ZIO[CohortTable with Logging, Failure, Long] = {
ZStream
.fromJavaIterator(
CSVParser.parse(new InputStreamReader(inputStream, "UTF-8"), csvFormat).iterator()
)
.mapBoth(
ex => SubscriptionIdUploadFailure(s"Failed to read subscription csv stream: $ex"),
csvRecord => csvRecord.get(0)
)
.mapZIO { subscriptionId =>
({
CohortTable
.create(CohortItem(subscriptionId, ReadyForEstimation))
} <* Logging.info(s"Imported subscription $subscriptionId"))
.catchSome { case e: CohortItemAlreadyPresentFailure =>
Logging.info(s"Ignored $subscriptionId as already in table (DB error: ${e.reason})").unit
}
.tapError(e => Logging.error(s"Subscription $subscriptionId failed: $e"))
}
.runCount
}